Module data_utils.data_processing
Expand source code
from comet_ml import Experiment # Comet.ml can log training metrics, parameters, do version control and parameter optimization
import torch # PyTorch to create and apply deep learning models
import dask.dataframe as dd # Dask to handle big data in dataframes
import math # Some mathematical operations
import numpy as np # NumPy to handle numeric and NaN operations
import numbers # numbers allows to check if data is numeric
import warnings # Print warnings for bad practices
from functools import partial # Enables using functions with some fixed parameters
from tqdm.auto import tqdm # tqdm allows to track code execution progress
from glob import glob # Find files by name
from . import utils # Generic and useful methods
from . import search_explore # Methods to search and explore data
import data_utils as du
# Pandas to handle the data in dataframes
if du.use_modin is True:
import modin.pandas as pd
else:
import pandas as pd
# Ignore Dask's 'meta' warning
warnings.filterwarnings("ignore", message="`meta` is not specified, inferred from partial data. Please provide `meta` if the result is unexpected.")
# Methods
def get_clean_label(orig_label, clean_labels, column_name=None):
'''Gets the clean version of a given label.
Parameters
----------
orig_label : string
Original label name that needs to be converted to the new format.
clean_labels : dict
Dictionary that converts each original label into a new, cleaner designation.
column_name : string, default None
Optional parameter to indicate a column name, which is used to specify better the
missing values.
Returns
-------
key : string
Returns the dictionary key from clean_labels that corresponds to the translation
given to the input label orig_label.
'''
for key in clean_labels:
if orig_label in clean_labels[key]:
return key
# Remaining labels (or lack of one) are considered as missing data
if column_name is not None:
return f'{column_name}_missing_value'
else:
return 'missing_value'
def rename_index(df, name):
'''Renames the dataframe's index to a desired name. Specially important
for dask dataframes, as they don't support any elegant, one-line method
for this.
Parameters
----------
df : pandas.DataFrame or dask.DataFrame
Dataframe whose index column will be renamed.
name : string
The new name for the index column.
Returns
-------
df : dask.DataFrame
Dataframe with a renamed index column.
'''
if isinstance(df, dd.DataFrame):
feat_names = set(df.columns)
df = df.reset_index()
orig_idx_name = set(df.columns) - feat_names
orig_idx_name = orig_idx_name.pop()
df = df.rename(columns={orig_idx_name: name})
df = df.set_index(name)
elif isinstance(df, pd.DataFrame):
df.index.names = [name]
else:
raise Exception(f'ERROR: Input "df" should either be a pandas dataframe or a dask dataframe, not type {type(df)}.')
return df
def standardize_missing_values(x, specific_nan_strings=[]):
'''Apply function to be used in replacing missing value representations with
the standard NumPy NaN value.
Parameters
----------
x : str, int or float
Value to be analyzed and replaced with NaN, if it has a missing value
representation.
specific_nan_strings : list of strings, default []
Parameter where the user can specify additional strings that
should correspond to missing values.
Returns
-------
x : str, int or float
Corrected value, with standardized missing value representation.
'''
if isinstance(x, str):
if utils.is_string_nan(x, specific_nan_strings):
return np.nan
else:
return x
else:
return x
def standardize_missing_values_df(df, see_progress=True, specific_nan_strings=[]):
'''Replace all elements in a dataframe that have a missing value
representation with the standard NumPy NaN value.
Parameters
----------
df : pandas.DataFrame or dask.DataFrame
Dataframe to be analyzed and have its content replaced with NaN,
wherever a missing value representation is found.
see_progress : bool, default True
If set to True, a progress bar will show up indicating the execution
of the normalization calculations.
specific_nan_strings : list of strings, default []
Parameter where the user can specify additional strings that
should correspond to missing values.
Returns
-------
df : pandas.DataFrame or dask.DataFrame
Corrected dataframe, with standardized missing value representation.
'''
for feature in utils.iterations_loop(df.columns, see_progress=see_progress):
if isinstance(df, dd.DataFrame):
df[feature] = df[feature].apply(lambda x: standardize_missing_values(x, specific_nan_strings),
meta=df[feature]._meta.dtypes)
elif isinstance(df, pd.DataFrame):
df[feature] = df[feature].apply(lambda x: standardize_missing_values(x, specific_nan_strings))
else:
raise Exception(f'ERROR: Input "df" should either be a pandas dataframe or a dask dataframe, not type {type(df)}.')
return df
def remove_cols_with_many_nans(df, nan_percent_thrsh=40, inplace=False):
'''Remove columns that have too many NaN's (missing values).
Parameters
----------
df : pandas.DataFrame or dask.DataFrame
Dataframe that will be processed, to remove columns with high
percentages of missing values.
nan_percent_thrsh : int or float, default 40
Threshold value above which it's considered a column with too
many missing values. Measured in percentage of missing values,
in 100% format.
inplace : bool, default False
If set to True, the original dataframe will be used and modified
directly. Otherwise, a copy will be created and returned, without
changing the original dataframe.
Returns
-------
df : pandas.DataFrame or dask.DataFrame
Corrected dataframe, with columns removed that had too many
missing values.
'''
if not inplace:
# Make a copy of the data to avoid potentially unwanted changes to the original dataframe
data_df = df.copy()
else:
# Use the original dataframe
data_df = df
# Find each column's missing values percentage
nan_percent_df = search_explore.dataframe_missing_values(data_df)
# Remove columns that exceed the missing values percentage threshold
many_nans_cols = list(nan_percent_df[nan_percent_df.percent_missing > nan_percent_thrsh].column_name)
data_df = data_df.drop(many_nans_cols, axis = 1)
return data_df
def clean_naming(x, lower_case=True):
'''Change strings to only have lower case letters and underscores.
Parameters
----------
x : string or list of strings
String(s) on which to clean the naming, standardizing it.
lower_case : bool, default True
If set to True, all strings will be converted to lower case.
Returns
-------
x : string or list of strings
Cleaned string(s).
'''
if 'pandas.core.indexes.base.Index' in str(type(x)):
# If the user input is a dataframe index (e.g. df.columns), convert it to a list
x = list(x)
if isinstance(x, list):
if lower_case is True:
x = [string.lower().replace(' ', '')
.replace(' ', '_')
.replace(',', '_and') for string in x]
else:
x = [string.replace(' ', '')
.replace(' ', '_')
.replace(',', '_and') for string in x]
elif (isinstance(x, pd.DataFrame)
or isinstance(x, pd.Series)
or isinstance(x, dd.DataFrame)
or isinstance(x, dd.Series)):
raise Exception('ERROR: Wrong method. When using dataframes or series, use clean_categories_naming() method instead.')
else:
if lower_case is True:
x = (str(x).lower().replace(' ', '')
.replace(' ', '_')
.replace(',', '_and'))
else:
x = (str(x).replace(' ', '')
.replace(' ', '_')
.replace(',', '_and'))
return x
def clean_categories_naming(df, column, clean_missing_values=True,
specific_nan_strings=[], lower_case=False):
'''Change categorical values to only have lower case letters and underscores.
Parameters
----------
df : pandas.DataFrame or dask.DataFrame
Dataframe that contains the column to be cleaned.
column : string
Name of the dataframe's column which needs to have its string values
standardized.
clean_missing_values : bool, default True
If set to True, the algorithm will search for missing value
representations and replace them with the standard, NumPy NaN value.
specific_nan_strings : list of strings, default []
Parameter where the user can specify additional strings that
should correspond to missing values.
lower_case : bool, default False
If set to True, all strings will be converted to lower case.
Returns
-------
df : pandas.DataFrame or dask.DataFrame
Dataframe with its string column already cleaned.
'''
# Fix the seeting of all lower case characters according to the `lower_case` parameter
clean_naming_prtl = partial(clean_naming, lower_case=lower_case)
if isinstance(df, dd.DataFrame):
df[column] = (df[column].map(clean_naming_prtl, meta=('x', str)))
if clean_missing_values is True:
df[column] = df[column].apply(lambda x: standardize_missing_values(x, specific_nan_strings),
meta=df[column]._meta.dtypes)
else:
df[column] = (df[column].map(clean_naming_prtl))
if clean_missing_values is True:
df[column] = df[column].apply(lambda x: standardize_missing_values(x, specific_nan_strings))
return df
def one_hot_encoding_dataframe(df, columns, clean_name=True, clean_missing_values=True,
specific_nan_strings=[], lower_case=False,
has_nan=False, join_rows=False,
join_by=['patientunitstayid', 'ts'],
get_new_column_names=False,
search_by_dtypes=False, inplace=False):
'''Transforms specified column(s) from a dataframe into a one hot encoding
representation.
Parameters
----------
df : pandas.DataFrame or dask.DataFrame
Dataframe that will be used, which contains the specified column.
columns : list of strings
Name of the column(s) that will be conveted to one hot encoding.
clean_name : bool, default True
If set to true, changes the name of the categorical values into lower
case, with words separated by an underscore instead of space.
clean_missing_values : bool, default True
If set to True, the algorithm will search for missing value
representations and replace them with the standard, NumPy NaN value.
specific_nan_strings : list of strings, default []
Parameter where the user can specify additional strings that
should correspond to missing values.
lower_case : bool, default False
If set to True, all strings will be converted to lower case.
has_nan : bool, default False
If set to true, will first fill the missing values (NaN) with the string
f'{column}_missing_value'.
join_rows : bool, default False
If set to true, will group the rows created by the one hot encoding by
summing the boolean values in the rows that have the same identifiers.
join_by : string or list, default ['subject_id', 'ts'])
Name of the column (or columns) which serves as a unique identifier of
the dataframe's rows, which will be used in the groupby operation if the
parameter join_rows is set to true. Can be a string (single column) or a
list of strings (multiple columns).
get_new_column_names : bool, default False
If set to True, the names of the new columns will also be outputed.
search_by_dtypes : bool, default False
If set to True, the method will only look for boolean columns based on
their data type. This is only reliable if all the columns' data types
have been properly set.
inplace : bool, default False
If set to True, the original dataframe will be used and modified
directly. Otherwise, a copy will be created and returned, without
changing the original dataframe.
Raises
------
ColumnNotFoundError
Column name not found in the dataframe.
Returns
-------
ohe_df : pandas.DataFrame or dask.DataFrame
Returns a new dataframe with the specified column in a one hot encoding
representation.
new_column_names : list of strings
List of the new, one hot encoded columns' names.
'''
if not inplace:
# Make a copy of the data to avoid potentially unwanted changes to the original dataframe
data_df = df.copy()
else:
# Use the original dataframe
data_df = df
# Make sure that the columns is a list
if isinstance(columns, str):
columns = [columns]
if not isinstance(columns, list):
raise Exception(f'ERROR: The `columns` argument must be specified as either a single string or a list of strings. Received input with type {type(columns)}.')
print('Cleaning the categorical columns...')
for col in utils.iterations_loop(columns):
# Check if the column exists
if col not in data_df.columns:
raise Exception('ERROR: Column name not found in the dataframe.')
if clean_name is True:
# Clean the column's string values to have the same, standard format
data_df = clean_categories_naming(data_df, col, clean_missing_values,
specific_nan_strings, lower_case)
if has_nan is True:
# Fill NaN with "missing_value" name
data_df[col] = data_df[col].fillna(value='missing_value')
# Cast the variable into the built in pandas Categorical data type
if isinstance(data_df, pd.DataFrame):
data_df[col] = pd.Categorical(data_df[col])
if isinstance(data_df, dd.DataFrame):
data_df = data_df.categorize(columns)
if get_new_column_names is True:
# Find the previously existing column names
old_column_names = data_df.columns
print('Getting dummies...')
# Apply the one hot encoding to the specified columns
if isinstance(data_df, dd.DataFrame):
ohe_df = dd.get_dummies(data_df, columns=columns)
else:
ohe_df = pd.get_dummies(data_df, columns=columns)
if join_rows is True:
# Columns which are one hot encoded
ohe_columns = search_explore.list_boolean_columns(ohe_df, search_by_dtypes=search_by_dtypes)
# Group the rows that have the same identifiers
ohe_df = ohe_df.groupby(join_by).sum(min_count=1).reset_index()
# Clip the one hot encoded columns to a maximum value of 1
# (there might be duplicates which cause values bigger than 1)
ohe_df.loc[:, ohe_columns] = ohe_df[ohe_columns].clip(upper=1)
print('Done!')
if get_new_column_names is True:
# Find the new column names and output them
new_column_names = list(set(ohe_df.columns) - set(old_column_names))
new_column_names.sort()
return ohe_df, new_column_names
else:
return ohe_df
def category_to_feature(df, categories_feature, values_feature, min_len=None,
see_progress=True, inplace=False):
'''Convert a categorical column and its corresponding values column into
new features, one for each category.
WARNING: Currently not working properly on a Dask dataframe. Apply .compute()
to the dataframe to convert it to Pandas, before passing it to this method.
If the data is too big to run on Pandas, use the category_to_feature_big_data
method.
Parameters
----------
df : pandas.DataFrame or dask.DataFrame
Dataframe on which to add the new features.
categories_feature : string
Name of the feature that contains the categories that will be converted
to individual features.
values_feature : string
Name of the feature that has each category's corresponding value, which
may or may not be a category on its own (e.g. it could be numeric values).
min_len : int, default None
If defined, only the categories that appear on at least `min_len` rows
are converted to features.
see_progress : bool, default True
If set to True, a progress bar will show up indicating the execution
of the normalization calculations.
inplace : bool, default False
If set to True, the original dataframe will be used and modified
directly. Otherwise, a copy will be created and returned, without
changing the original dataframe.
Returns
-------
data_df : pandas.DataFrame or dask.DataFrame
Dataframe with the newly created features.
'''
if not inplace:
# Make a copy of the data to avoid potentially unwanted changes to the original dataframe
data_df = df.copy()
else:
# Use the original dataframe
data_df = df
# Find the unique categories
categories = data_df[categories_feature].unique()
if isinstance(df, dd.DataFrame):
categories = categories.compute()
# Create a feature for each category
for category in utils.iterations_loop(categories, see_progress=see_progress):
if min_len is not None:
# Check if the current category has enough data to be worth it to convert to a feature
if len(data_df[data_df[categories_feature] == category]) < min_len:
# Ignore the current category
continue
# Convert category to feature
data_df[category] = data_df.apply(lambda x: x[values_feature] if x[categories_feature] == category
else np.nan, axis=1)
return data_df
def category_to_feature_big_data(df, categories_feature, values_feature,
min_len=None, see_progress=True):
'''Convert a categorical column and its corresponding values column into
new features, one for each category. Optimized for very big Dask dataframes,
which can't be processed as a whole Pandas dataframe.
Parameters
----------
df : dask.DataFrame
Dataframe on which to add the new features.
categories_feature : string
Name of the feature that contains the categories that will be converted
to individual features.
values_feature : string
Name of the feature that has each category's corresponding value, which
may or may not be a category on its own (e.g. it could be numeric values).
min_len : int, default None
If defined, only the categories that appear on at least `min_len` rows
are converted to features.
see_progress : bool, default True
If set to True, a progress bar will show up indicating the execution
of the normalization calculations.
Returns
-------
data_df : dask.DataFrame
Dataframe with the newly created features.
'''
# Create a list with Pandas dataframe versions of each partition of the
# original Dask dataframe
df_list = []
print('Converting categories to features in each partition...')
for n in utils.iterations_loop(range(df.npartitions), see_progress=see_progress):
# Process each partition separately in Pandas
tmp_df = df.get_partition(n).compute()
tmp_df = category_to_feature(tmp_df, categories_feature=categories_feature,
values_feature=values_feature, min_len=min_len,
see_progress=see_progress)
df_list.append(tmp_df)
# Rejoin all the partitions into a Dask dataframe with the same number of
# partitions it originally had
print('Rejoining partitions into a Dask dataframe...')
data_df = dd.from_pandas(pd.concat(df_list, sort=False), npartitions=df.npartitions)
print('Done!')
return data_df
def remove_rows_unmatched_key(df, key, columns):
'''Remove rows corresponding to the keys that weren't in the dataframe merged at the right.
Parameters
----------
df : pandas.DataFrame or dask.DataFrame
Dataframe resulting from a asof merge which will be searched for missing values.
key : string
Name of the column which was used as the "by" key in the asof merge. Typically
represents a temporal feature from a time series, such as days or timestamps.
columns : list of strings
Name of the column(s), originating from the dataframe which was merged at the
right, which should not have any missing values. If it has, it means that
the corresponding key wasn't present in the original dataframe. Even if there's
just one column to analyze, it should be received in list format.
Returns
-------
df : pandas.DataFrame or dask.DataFrame
Returns the input dataframe but without the rows which didn't have any values
in the right dataframe's features.
'''
for k in utils.iterations_loop(df[key].unique()):
# Variable that counts the number of columns which don't have any value
# (i.e. all rows are missing values) for a given identifier 'k'
num_empty_columns = 0
for col in columns:
if df[df[key] == k][col].isnull().sum() == len(df[df[key] == k]):
# Found one more column which is full of missing values for identifier 'k'
num_empty_columns += 1
if num_empty_columns == len(columns):
# Eliminate all rows corresponding to the analysed key if all the columns
# are empty for the identifier 'k'
df = df[~(df[key] == k)]
return df
def apply_zscore_norm(value, df=None, mean=None, std=None, categories_means=None,
categories_stds=None, groupby_columns=None):
'''Performs z-score normalization when used inside a Pandas or Dask
apply function.
Parameters
----------
value : int or float
Original, unnormalized value.
df : pandas.DataFrame or dask.DataFrame, default None
Original pandas dataframe which is used to retrieve the
necessary statistical values used in group normalization, i.e. when
values are normalized according to their corresponding categories.
mean : int or float, default None
Average (mean) value to be used in the z-score normalization.
std : int or float, default None
Standard deviation value to be used in the z-score normalization.
categories_means : dict, default None
Dictionary containing the average values for each set of categories.
categories_stds : dict, default None
Dictionary containing the standard deviation values for each set of
categories.
groupby_columns : string or list of strings, default None
Name(s) of the column(s) that contains the categories from which
statistical values (mean and standard deviation) are retrieved.
Returns
-------
value_norm : int or float
Z-score normalized value.
'''
if not isinstance(value, numbers.Number):
raise Exception(f'ERROR: Input value should be a number, not an object of type {type(value)}.')
if mean is not None and std is not None:
return (value - mean) / std
elif (df is not None and categories_means is not None
and categories_stds is not None and groupby_columns is not None):
try:
if isinstance(groupby_columns, list):
return ((value - categories_means[tuple(df[groupby_columns])])
/ categories_stds[tuple(df[groupby_columns])])
else:
return ((value - categories_means[df[groupby_columns]])
/ categories_stds[df[groupby_columns]])
except Exception:
warnings.warn(f'Couldn\'t manage to find the mean and standard deviation values for the groupby columns {groupby_columns} with values {tuple(df[groupby_columns])}.')
return np.nan
else:
raise Exception('ERROR: Invalid parameters. Either the `mean` and `std` or the `df`, `categories_means`, `categories_stds` and `groupby_columns` must be set.')
def apply_minmax_norm(value, df=None, min=None, max=None, categories_mins=None,
categories_maxs=None, groupby_columns=None):
'''Performs minmax normalization when used inside a Pandas or Dask
apply function.
Parameters
----------
value : int or float
Original, unnormalized value.
df : pandas.DataFrame or dask.DataFrame, default None
Original pandas dataframe which is used to retrieve the
necessary statistical values used in group normalization, i.e. when
values are normalized according to their corresponding categories.
min : int or float, default None
Minimum value to be used in the minmax normalization.
max : int or float, default None
Maximum value to be used in the minmax normalization.
categories_mins : dict, default None
Dictionary containing the minimum values for each set of categories.
categories_maxs : dict, default None
Dictionary containing the maximum values for each set of categories.
groupby_columns : string or list of strings, default None
Name(s) of the column(s) that contains the categories from which
statistical values (minimum and maximum) are retrieved.
Returns
-------
value_norm : int or float
Minmax normalized value.
'''
if not isinstance(value, numbers.Number):
raise Exception(f'ERROR: Input value should be a number, not an object of type {type(value)}.')
if min and max:
return (value - min) / (max - min)
elif df and categories_mins and categories_maxs and groupby_columns:
try:
if isinstance(groupby_columns, list):
return ((value - categories_mins[tuple(df[groupby_columns])])
/ (categories_maxs[tuple(df[groupby_columns])] - categories_mins[tuple(df[groupby_columns])]))
else:
return ((value - categories_mins[df[groupby_columns]])
/ (categories_maxs[df[groupby_columns]] - categories_mins[df[groupby_columns]]))
except Exception:
warnings.warn(f'Couldn\'t manage to find the mean and standard deviation values for the groupby columns {groupby_columns} with values {tuple(df[groupby_columns])}.')
return np.nan
else:
raise Exception('ERROR: Invalid parameters. Either the `min` and `max` or the `df`, `categories_mins`, `categories_maxs` and `groupby_columns` must be set.')
def apply_zscore_denorm(value, df=None, mean=None, std=None, categories_means=None,
categories_stds=None, groupby_columns=None):
'''Performs z-score denormalization when used inside a Pandas or Dask
apply function.
Parameters
----------
value : int or float
Input normalized value.
df : pandas.DataFrame or dask.DataFrame, default None
Original pandas dataframe which is used to retrieve the
necessary statistical values used in group denormalization, i.e. when
values are denormalized according to their corresponding categories.
mean : int or float, default None
Average (mean) value to be used in the z-score denormalization.
std : int or float, default None
Standard deviation value to be used in the z-score denormalization.
categories_means : dict, default None
Dictionary containing the average values for each set of categories.
categories_stds : dict, default None
Dictionary containing the standard deviation values for each set of
categories.
groupby_columns : string or list of strings, default None
Name(s) of the column(s) that contains the categories from which
statistical values (mean and standard deviation) are retrieved.
Returns
-------
value_denorm : int or float
Z-score denormalized value.
'''
if not isinstance(value, numbers.Number):
raise Exception(f'ERROR: Input value should be a number, not an object of type {type(value)}.')
if mean is not None and std is not None:
return value * std + mean
elif (df is not None and categories_means is not None
and categories_stds is not None and groupby_columns is not None):
try:
if isinstance(groupby_columns, list):
return (value * categories_stds[tuple(df[groupby_columns])]
+ categories_means[tuple(df[groupby_columns])])
else:
return (value * categories_stds[df[groupby_columns]]
+ categories_means[df[groupby_columns]])
except Exception:
warnings.warn(f'Couldn\'t manage to find the mean and standard deviation values for the groupby columns {groupby_columns} with values {tuple(df[groupby_columns])}.')
return np.nan
else:
raise Exception('ERROR: Invalid parameters. Either the `mean` and `std` or the `df`, `categories_means`, `categories_stds` and `groupby_columns` must be set.')
def apply_minmax_denorm(value, df=None, min=None, max=None, categories_mins=None,
categories_maxs=None, groupby_columns=None):
'''Performs minmax denormalization when used inside a Pandas or Dask
apply function.
Parameters
----------
value : int or float
Input normalized value.
df : pandas.DataFrame or dask.DataFrame, default None
Original pandas dataframe which is used to retrieve the
necessary statistical values used in group denormalization, i.e. when
values are denormalized according to their corresponding categories.
min : int or float, default None
Minimum value to be used in the minmax denormalization.
max : int or float, default None
Maximum value to be used in the minmax denormalization.
categories_mins : dict, default None
Dictionary containing the minimum values for each set of categories.
categories_maxs : dict, default None
Dictionary containing the maximum values for each set of categories.
groupby_columns : string or list of strings, default None
Name(s) of the column(s) that contains the categories from which
statistical values (minimum and maximum) are retrieved.
Returns
-------
value_denorm : int or float
Minmax denormalized value.
'''
if not isinstance(value, numbers.Number):
raise Exception(f'ERROR: Input value should be a number, not an object of type {type(value)}.')
if min is not None and max is not None:
return value * (max - min) + min
elif (df is not None and categories_mins is not None
and categories_maxs is not None and groupby_columns is not None):
try:
if isinstance(groupby_columns, list):
return (value * (categories_maxs[tuple(df[groupby_columns])]
- categories_mins[tuple(df[groupby_columns])])
+ categories_mins[tuple(df[groupby_columns])])
else:
return (value * (categories_maxs[df[groupby_columns]]
- categories_mins[df[groupby_columns]])
+ categories_mins[df[groupby_columns]])
except Exception:
warnings.warn(f'Couldn\'t manage to find the mean and standard deviation values for the groupby columns {groupby_columns} with values {tuple(df[groupby_columns])}.')
return np.nan
else:
raise Exception('ERROR: Invalid parameters. Either the `min` and `max` or the `df`, `categories_mins`, `categories_maxs` and `groupby_columns` must be set.')
def normalize_data(df, data=None, id_columns=['patientunitstayid', 'ts'],
normalization_method='z-score', columns_to_normalize=None,
columns_to_normalize_categ=None, categ_columns=None,
see_progress=True, get_stats=False,
search_by_dtypes=False, inplace=False):
'''Performs data normalization to a continuous valued tensor or dataframe,
changing the scale of the data.
Parameters
----------
df : pandas.DataFrame or dask.DataFrame
Original Pandas or Dask dataframe which is used to correctly calculate the
necessary statistical values used in the normalization. These values
can't be calculated from the tensor as it might have been padded. If
the data tensor isn't specified, the normalization is applied directly
on the dataframe.
data : torch.Tensor, default None
PyTorch tensor corresponding to the data which will be normalized
by the specified normalization method. If the data tensor isn't
specified, the normalization is applied directly on the dataframe.
id_columns : string or list of strings, default ['subject_id', 'ts']
List of columns names which represent identifier columns. These are not
supposed to be normalized.
normalization_method : string, default 'z-score'
Specifies the normalization method used. It can be a z-score
normalization, where the data is subtracted of its mean and divided
by the standard deviation, which makes it have zero average and unit
variance, much like a standard normal distribution; it can be a
min-max normalization, where the data is subtracted by its minimum
value and then divided by the difference between the minimum and the
maximum value, getting to a fixed range from 0 to 1.
columns_to_normalize : string or list of strings, default None
If specified, the columns provided in the list are the only ones that
will be normalized. If set to False, no column will be normalized directly,
although columns can still be normalized in groups of categories, if
specified in the `columns_to_normalize_categ` parameter. Otherwise, all
continuous columns will be normalized.
columns_to_normalize_categ : tuple or list of tuples of tuples, default None
If specified, the columns provided in the list are going to be
normalized on their categories. That is, the values (column 2 in the
tuple) are normalized with stats of their respective categories (column
1 of the tuple). Otherwise, no column will be normalized on their
categories.
categ_columns : string or list of strings, default None
If specified, the columns in the list, which represent categorical
features, which either are a label or will be embedded, aren't
going to be normalized.
see_progress : bool, default True
If set to True, a progress bar will show up indicating the execution
of the normalization calculations.
get_stats : bool, default False
If set to True, the stats used to normalize the data (e.g. mean and
standard deviation) are also outputed.
search_by_dtypes : bool, default False
If set to True, the method will only look for boolean columns based on
their data type. This is only reliable if all the columns' data types
have been properly set.
inplace : bool, default False
If set to True, the original dataframe will be used and modified
directly. Otherwise, a copy will be created and returned, without
changing the original dataframe.
Returns
-------
data : pandas.DataFrame or dask.DataFrame or torch.Tensor
Normalized Pandas or Dask dataframe or PyTorch tensor.
If get_stats == True and normalization_method == 'z-score':
mean : float or dict or list of floats or list of dicts
Mean value(s) used in the data normalization.
std : float or dict or list of floats or list of dicts
Standard deviation value(s) used in the data normalization.
If get_stats == True and normalization_method == 'min-max':
min : dict
Minimum value(s) used in the data normalization.
max : dict
Maximum value(s) used in the data normalization.
'''
# Check if specific columns have been specified for normalization
if columns_to_normalize is None:
# List of all columns in the dataframe
feature_columns = list(df.columns)
# Normalize all non identifier continuous columns, ignore one hot encoded ones
columns_to_normalize = feature_columns
if id_columns is not None:
# Make sure that the id_columns is a list
if isinstance(id_columns, str):
id_columns = [id_columns]
if not isinstance(id_columns, list):
raise Exception(f'ERROR: The `id_columns` argument must be specified as either a single string or a list of strings. Received input with type {type(id_columns)}.')
# List of all columns in the dataframe, except the ID columns
[columns_to_normalize.remove(col) for col in id_columns]
if categ_columns is not None:
# Make sure that the categ_columns is a list
if isinstance(categ_columns, str):
categ_columns = [categ_columns]
if not isinstance(categ_columns, list):
raise Exception(f'ERROR: The `categ_columns` argument must be specified as either a single string or a list of strings. Received input with type {type(categ_columns)}.')
# Prevent all features that will be embedded from being normalized
[columns_to_normalize.remove(col) for col in categ_columns]
# List of boolean or one hot encoded columns
boolean_cols = search_explore.list_boolean_columns(df[columns_to_normalize], search_by_dtypes=search_by_dtypes)
if boolean_cols is not None:
# Prevent boolean features from being normalized
[columns_to_normalize.remove(col) for col in boolean_cols]
# Remove all non numeric columns that could be left
columns_to_normalize = [col for col in columns_to_normalize
if df[col].dtype == int or df[col].dtype == float]
if columns_to_normalize is None:
print('No columns to normalize, returning the original dataframe.')
return df
# Make sure that the columns_to_normalize is a list
if isinstance(columns_to_normalize, str):
columns_to_normalize = [columns_to_normalize]
if not isinstance(columns_to_normalize, list) and not isinstance(columns_to_normalize, bool):
raise Exception(f'ERROR: The `columns_to_normalize` argument must be specified as either a single string, a list of strings or a boolean. Received input with type {type(columns_to_normalize)}.')
if type(normalization_method) is not str:
raise ValueError('Argument normalization_method should be a string. Available options are "z-score" and "min-max".')
if normalization_method.lower() == 'z-score':
if columns_to_normalize is not False:
# Calculate the means and standard deviations
means = df[columns_to_normalize].mean()
stds = df[columns_to_normalize].std()
# Check if there are constant features
const_feat = list(stds[stds == 0].index)
if len(const_feat) > 0:
# Prevent constant features from being normalized
[columns_to_normalize.remove(col) for col in const_feat]
means = means.drop(const_feat)
stds = stds.drop(const_feat)
warnings.warn(f'Found columns {const_feat} to be constant throughout all the data. They should be removed as no insight will be extracted from them.')
if isinstance(df, dd.DataFrame):
# Make sure that the values are computed, in case we're using Dask
means = means.compute()
stds = stds.compute()
# Check if the data being normalized is directly the dataframe
if data is None:
if not inplace:
# Make a copy of the data to avoid potentially unwanted changes to the original dataframe
data = df.copy()
else:
# Use the original dataframe
data = df
# Normalize the right columns
if columns_to_normalize is not False:
print(f'z-score normalizing columns {columns_to_normalize}...')
data[columns_to_normalize] = (data[columns_to_normalize] - means) / stds
if columns_to_normalize_categ is not None:
if get_stats is True:
mean_list = []
std_list = []
# Make sure that the columns_to_normalize_categ is a list
if isinstance(columns_to_normalize_categ, tuple):
columns_to_normalize_categ = [columns_to_normalize_categ]
if not isinstance(columns_to_normalize_categ, list):
raise Exception(f'ERROR: The `columns_to_normalize_categ` argument must be specified as either a single tuple or a list of tuples. Received input with type {type(columns_to_normalize_categ)}.')
print(f'z-score normalizing columns {columns_to_normalize_categ} by their associated categories...')
for col_tuple in utils.iterations_loop(columns_to_normalize_categ, see_progress=see_progress):
categ_columns = col_tuple[0]
column_to_normalize = col_tuple[1]
# Calculate the means and standard deviations
means_grpb = df.groupby(categ_columns)[column_to_normalize].mean()
stds_grpb = df.groupby(categ_columns)[column_to_normalize].std()
if isinstance(df, dd.DataFrame):
# Make sure that the values are computed, in case we're using Dask
means_grpb = means.compute()
stds_grpb = stds.compute()
if get_stats is True:
if isinstance(column_to_normalize, str):
# Make sure that the feature being normalized has its name specified in the stats
tmp_mean_grpb = dict()
tmp_std_grpb = dict()
tmp_mean_grpb[column_to_normalize] = means_grpb.to_dict()
tmp_std_grpb[column_to_normalize] = stds_grpb.to_dict()
# Add the current stats values to the output lists
mean_list.append(tmp_mean_grpb)
std_list.append(tmp_std_grpb)
else:
# Add the current stats values to the output lists
mean_list.append(means_grpb.to_dict())
std_list.append(stds_grpb.to_dict())
# Get the categories columns as a numpy array, so as to
# index the groupby-resulting dataframes of mean and standard
# deviation values
cat_arr = df[categ_columns].to_numpy()
if isinstance(categ_columns, list) and len(categ_columns) > 1:
# Convert the sets of values into tuples so as to be
# properly readable as dataframe indices
cat_arr = list(map(tuple, cat_arr))
# Get the mean and standard deviation values in the same
# order as the original dataframe's row order
means_cat = means_grpb.loc[cat_arr].to_numpy()
stds_cat = stds_grpb.loc[cat_arr].to_numpy()
# Normalize the right categories
data[column_to_normalize] = (data[column_to_normalize] - means_cat) / stds_cat
if get_stats is True:
# Merge all the stats dictionaries
mean_categ_dict = utils.merge_dicts(mean_list)
std_categ_dict = utils.merge_dicts(std_list)
# Otherwise, the tensor is normalized
else:
if columns_to_normalize is not False:
# Dictionaries to retrieve the mean and standard deviation values
column_means = dict(means)
column_stds = dict(stds)
# Dictionary to convert the the tensor's column indices into the dataframe's column names
idx_to_name = dict(enumerate(df.columns))
# Dictionary to convert the dataframe's column names into the tensor's column indices
name_to_idx = dict([(t[1], t[0]) for t in enumerate(df.columns)])
# List of indices of the tensor's columns which are needing normalization
tensor_columns_to_normalize = [name_to_idx[name] for name in columns_to_normalize]
# Normalize the right columns
print(f'z-score normalizing columns {columns_to_normalize}...')
for col in utils.iterations_loop(tensor_columns_to_normalize, see_progress=see_progress):
data[:, :, col] = ((data[:, :, col] - column_means[idx_to_name[col]])
/ column_stds[idx_to_name[col]])
if get_stats is False:
return data
elif columns_to_normalize is not False and columns_to_normalize_categ is not None:
return data, means.to_dict(), stds.to_dict(), mean_categ_dict, std_categ_dict
elif columns_to_normalize is not False and columns_to_normalize_categ is None:
return data, means.to_dict(), stds.to_dict()
elif columns_to_normalize is False and columns_to_normalize_categ is not None:
return data, mean_categ_dict, std_categ_dict
elif normalization_method.lower() == 'min-max':
if columns_to_normalize is not False:
mins = df[columns_to_normalize].min()
maxs = df[columns_to_normalize].max()
# Check if there are constant features
const_feat = list(mins[mins == maxs].index)
if len(const_feat) > 0:
# Prevent constant features from being normalized
[columns_to_normalize.remove(col) for col in const_feat]
mins = mins.drop(const_feat)
maxs = maxs.drop(const_feat)
warnings.warn(f'Found columns {const_feat} to be constant throughout all the data. They should be removed as no insight will be extracted from them.')
if isinstance(df, dd.DataFrame):
# Make sure that the values are computed, in case we're using Dask
mins = means.compute()
maxs = maxs.compute()
# Check if the data being normalized is directly the dataframe
if data is None:
if not inplace:
# Make a copy of the data to avoid potentially unwanted changes to the original dataframe
data = df.copy()
else:
# Use the original dataframe
data = df
if columns_to_normalize is not False:
# Normalize the right columns
print(f'min-max normalizing columns {columns_to_normalize}...')
data[columns_to_normalize] = (data[columns_to_normalize] - mins) / (maxs - mins)
if columns_to_normalize_categ is not None:
if get_stats is True:
min_list = []
max_list = []
# Make sure that the columns_to_normalize_categ is a list
if isinstance(columns_to_normalize_categ, tuple):
columns_to_normalize_categ = [columns_to_normalize_categ]
if not isinstance(columns_to_normalize_categ, list):
raise Exception(f'ERROR: The `columns_to_normalize_categ` argument must be specified as either a single tuple or a list of tuples. Received input with type {type(columns_to_normalize_categ)}.')
print(f'min-max normalizing columns {columns_to_normalize_categ} by their associated categories...')
for col_tuple in columns_to_normalize_categ:
categ_columns = col_tuple[0]
column_to_normalize = col_tuple[1]
# Calculate the minimum and maximum values
mins_grpb = df.groupby(col_tuple[0])[col_tuple[1]].min()
maxs_grpb = df.groupby(col_tuple[0])[col_tuple[1]].max()
if isinstance(df, dd.DataFrame):
# Make sure that the values are computed, in case we're using Dask
mins_grpb = mins_grpb.compute()
maxs_grpb = maxs_grpb.compute()
if get_stats is True:
if isinstance(column_to_normalize, str):
# Make sure that the feature being normalized has its name specified in the stats
tmp_min_grpb = dict()
tmp_max_grpb = dict()
tmp_min_grpb[column_to_normalize] = mins_grpb.to_dict()
tmp_max_grpb[column_to_normalize] = maxs_grpb.to_dict()
# Add the current stats values to the output lists
min_list.append(tmp_min_grpb)
max_list.append(tmp_max_grpb)
else:
# Add the current stats values to the output lists
min_list.append(mins_grpb.to_dict())
max_list.append(maxs_grpb.to_dict())
# Get the categories columns as a numpy array, so as to
# index the groupby-resulting dataframes of minimum and
# maximum values
cat_arr = df[categ_columns].to_numpy()
if isinstance(categ_columns, list) and len(categ_columns) > 1:
# Convert the sets of values into tuples so as to be
# properly readable as dataframe indices
cat_arr = list(map(tuple, cat_arr))
# Get the minimum and maximum values in the same
# order as the original dataframe's row order
mins_cat = mins_grpb.loc[cat_arr].to_numpy()
maxs_cat = maxs_grpb.loc[cat_arr].to_numpy()
# Normalize the right categories
data[column_to_normalize] = (data[column_to_normalize] - mins_cat) / (maxs_cat - mins_cat)
if get_stats is True:
# Merge all the stats dictionaries
min_categ_dict = utils.merge_dicts(min_list)
max_categ_dict = utils.merge_dicts(max_list)
# Otherwise, the tensor is normalized
else:
if columns_to_normalize is not False:
# Dictionaries to retrieve the min and max values
column_mins = dict(mins)
column_maxs = dict(maxs)
# Dictionary to convert the the tensor's column indices into the dataframe's column names
idx_to_name = dict(enumerate(df.columns))
# Dictionary to convert the dataframe's column names into the tensor's column indices
name_to_idx = dict([(t[1], t[0]) for t in enumerate(df.columns)])
# List of indices of the tensor's columns which are needing normalization
tensor_columns_to_normalize = [name_to_idx[name] for name in columns_to_normalize]
# Normalize the right columns
print(f'min-max normalizing columns {columns_to_normalize}...')
for col in utils.iterations_loop(tensor_columns_to_normalize, see_progress=see_progress):
data[:, :, col] = ((data[:, :, col] - column_mins[idx_to_name[col]])
/ (column_maxs[idx_to_name[col]] - column_mins[idx_to_name[col]]))
if get_stats is False:
return data
elif columns_to_normalize is not False and columns_to_normalize_categ is not None:
return data, mins.to_dict(), maxs.to_dict(), min_categ_dict, max_categ_dict
elif columns_to_normalize is not False and columns_to_normalize_categ is None:
return data, mins.to_dict(), maxs.to_dict()
elif columns_to_normalize is False and columns_to_normalize_categ is not None:
return data, min_categ_dict, max_categ_dict
else:
raise ValueError(f'{normalization_method} isn\'t a valid normalization method. Available options \
are "z-score" and "min-max".')
def denormalize_data(df=None, data=None, id_columns=['patientunitstayid', 'ts'],
denormalization_method='z-score', columns_to_denormalize=None,
columns_to_denormalize_categ=None, categ_columns=None,
see_progress=True, search_by_dtypes=False, inplace=False,
means=None, stds=None, mins=None, maxs=None,
feature_columns=None):
'''Performs data denormalization to a continuous valued tensor or dataframe,
changing the scale of the data.
Parameters
----------
df : pandas.DataFrame or dask.DataFrame, default None
Original Pandas or Dask dataframe which is used to correctly calculate the
necessary statistical values used in the denormalization. These values
can't be calculated from the tensor as it might have been padded. If
the data tensor isn't specified, the denormalization is applied directly
on the dataframe.
data : torch.Tensor or numpy.Array, default None
PyTorch tensor or NumPy array corresponding to the data which will be
denormalized by the specified denormalization method. If the data isn't
specified, the denormalization is applied directly on the dataframe.
id_columns : string or list of strings, default ['subject_id', 'ts']
List of columns names which represent identifier columns. These are not
supposed to be denormalized.
denormalization_method : string, default 'z-score'
Specifies the denormalization method used. It can be a z-score
denormalization, where the data is subtracted of its mean and divided
by the standard deviation, which makes it have zero average and unit
variance, much like a standard normal distribution; it can be a
min-max denormalization, where the data is subtracted by its minimum
value and then divided by the difference between the minimum and the
maximum value, getting to a fixed range from 0 to 1.
columns_to_denormalize : string or list of strings, default None
If specified, the columns provided in the list are the only ones that
will be denormalized. If set to False, no column will be denormalized directly,
although columns can still be denormalized in groups of categories, if
specified in the `columns_to_denormalize_categ` parameter. Otherwise, all
continuous columns will be denormalized.
columns_to_denormalize_categ : tuple or list of tuples of tuples, default None
If specified, the columns provided in the list are going to be
denormalized on their categories. That is, the values (column 2 in the
tuple) are denormalized with stats of their respective categories (column
1 of the tuple). Otherwise, no column will be denormalized on their
categories.
categ_columns : string or list of strings, default None
If specified, the columns in the list, which represent categorical
features, which either are a label or will be embedded, aren't
going to be denormalized.
see_progress : bool, default True
If set to True, a progress bar will show up indicating the execution
of the denormalization calculations.
search_by_dtypes : bool, default False
If set to True, the method will only look for boolean columns based on
their data type. This is only reliable if all the columns' data types
have been properly set.
inplace : bool, default False
If set to True, the original dataframe will be used and modified
directly. Otherwise, a copy will be created and returned, without
changing the original dataframe.
Returns
-------
data : pandas.DataFrame or dask.DataFrame or torch.Tensor
Denormalized Pandas or Dask dataframe or PyTorch tensor.
'''
# [TODO] Add the option in denormalize_data to denormalize a data tensor
# using a norm_stats dictionary instead of fetching the denormalization
# stats from the original dataframe
if feature_columns is None and df is not None:
# List of all columns in the dataframe
feature_columns = list(df.columns)
# Check if specific columns have been specified for denormalization
if columns_to_denormalize is None:
# Denormalize all non identifier continuous columns, ignore one hot encoded ones
columns_to_denormalize = feature_columns.copy()
if id_columns is not None:
# Make sure that the id_columns is a list
if isinstance(id_columns, str):
id_columns = [id_columns]
if not isinstance(id_columns, list):
raise Exception(f'ERROR: The `id_columns` argument must be specified as either a single string or a list of strings. Received input with type {type(id_columns)}.')
# List of all columns in the dataframe, except the ID columns
[columns_to_denormalize.remove(col) for col in id_columns]
if categ_columns is not None:
# Make sure that the categ_columns is a list
if isinstance(categ_columns, str):
categ_columns = [categ_columns]
if not isinstance(categ_columns, list):
raise Exception(f'ERROR: The `categ_columns` argument must be specified as either a single string or a list of strings. Received input with type {type(categ_columns)}.')
# Prevent all features that will be embedded from being denormalized
[columns_to_denormalize.remove(col) for col in categ_columns]
# List of boolean or one hot encoded columns
boolean_cols = search_explore.list_boolean_columns(df[columns_to_denormalize], search_by_dtypes=search_by_dtypes)
if boolean_cols is not None:
# Prevent boolean features from being denormalized
[columns_to_denormalize.remove(col) for col in boolean_cols]
# Remove all non numeric columns that could be left
columns_to_denormalize = [col for col in columns_to_denormalize
if df[col].dtype == int or df[col].dtype == float]
if columns_to_denormalize is None:
print('No columns to denormalize, returning the original dataframe.')
return df
# Make sure that the columns_to_denormalize is a list
if isinstance(columns_to_denormalize, str):
columns_to_denormalize = [columns_to_denormalize]
if not isinstance(columns_to_denormalize, list) and not isinstance(columns_to_denormalize, bool):
raise Exception(f'ERROR: The `columns_to_denormalize` argument must be specified as either a single string, a list of strings or a boolean. Received input with type {type(columns_to_denormalize)}.')
if type(denormalization_method) is not str:
raise ValueError('Argument denormalization_method should be a string. Available options are "z-score" and "min-max".')
if denormalization_method.lower() == 'z-score':
if columns_to_denormalize is not False:
# Calculate the means and standard deviations
if means is None:
means = df[columns_to_denormalize].mean()
if stds is None:
stds = df[columns_to_denormalize].std()
# Check if there are constant features
if isinstance(stds, pd.Series):
const_feat = list(stds[stds == 0].index)
elif isinstance(stds, dict):
const_feat = [feat for feat in stds.keys() if stds[feat] == 0]
if len(const_feat) > 0:
# Prevent constant features from being denormalized
[columns_to_denormalize.remove(col) for col in const_feat]
means = means.drop(const_feat)
stds = stds.drop(const_feat)
warnings.warn(f'Found columns {const_feat} to be constant throughout all the data. They should be removed as no insight will be extracted from them.')
if isinstance(df, dd.DataFrame):
# Make sure that the values are computed, in case we're using Dask
means = means.compute()
stds = stds.compute()
# Check if the data being denormalized is directly the dataframe
if data is None:
if not inplace:
# Make a copy of the data to avoid potentially unwanted changes to the original dataframe
data = df.copy()
else:
# Use the original dataframe
data = df
# Denormalize the right columns
if columns_to_denormalize is not False:
print(f'z-score denormalizing columns {columns_to_denormalize}...')
data[columns_to_denormalize] = data[columns_to_denormalize] * stds + means
if columns_to_denormalize_categ is not None:
# Make sure that the columns_to_denormalize_categ is a list
if isinstance(columns_to_denormalize_categ, tuple):
columns_to_denormalize_categ = [columns_to_denormalize_categ]
if not isinstance(columns_to_denormalize_categ, list):
raise Exception(f'ERROR: The `columns_to_denormalize_categ` argument must be specified as either a single tuple or a list of tuples. Received input with type {type(columns_to_denormalize_categ)}.')
print(f'z-score denormalizing columns {columns_to_denormalize_categ} by their associated categories...')
for col_tuple in utils.iterations_loop(columns_to_denormalize_categ, see_progress=see_progress):
categ_columns = col_tuple[0]
column_to_denormalize = col_tuple[1]
# Calculate the means and standard deviations
means_grpb = df.groupby(categ_columns)[
column_to_denormalize].mean()
stds_grpb = df.groupby(categ_columns)[
column_to_denormalize].std()
if isinstance(df, dd.DataFrame):
# Make sure that the values are computed, in case we're using Dask
means_grpb = means.compute()
stds_grpb = stds.compute()
# Get the categories columns as a numpy array, so as to
# index the groupby-resulting dataframes of mean and standard
# deviation values
cat_arr = df[categ_columns].to_numpy()
if isinstance(categ_columns, list) and len(categ_columns) > 1:
# Convert the sets of values into tuples so as to be
# properly readable as dataframe indices
cat_arr = list(map(tuple, cat_arr))
# Get the mean and standard deviation values in the same
# order as the original dataframe's row order
means_cat = means_grpb.loc[cat_arr].to_numpy()
stds_cat = stds_grpb.loc[cat_arr].to_numpy()
# Denormalize the right categories
data[column_to_denormalize] = data[column_to_denormalize] * stds_cat + means_cat
# Otherwise, the array is denormalized
else:
if not inplace:
# Make a copy of the data to avoid potentially unwanted changes to the original array
if isinstance(data, torch.Tensor):
data = data.clone()
else:
data = data.copy()
else:
# Use the original array
data = data
if columns_to_denormalize is not False:
# Dictionaries to retrieve the mean and standard deviation values
if not isinstance(means, dict):
means = dict(means)
if not isinstance(stds, dict):
stds = dict(stds)
# Dictionary to convert the the array's column indices into the dataframe's column names
idx_to_name = dict(enumerate(feature_columns))
# Dictionary to convert the dataframe's column names into the array's column indices
name_to_idx = dict([(t[1], t[0])
for t in enumerate(feature_columns)])
# List of indices of the array's columns which are needing denormalization
array_columns_to_denormalize = [name_to_idx[name]
for name in columns_to_denormalize]
# Denormalize the right columns
print(f'z-score denormalizing columns {columns_to_denormalize}...')
for col in utils.iterations_loop(array_columns_to_denormalize, see_progress=see_progress):
if len(data.shape) == 3:
data[:, :, col] = data[:, :, col] * stds[idx_to_name[col]] + means[idx_to_name[col]]
elif len(data.shape) == 2:
data[:, col] = data[:, col] * stds[idx_to_name[col]] + means[idx_to_name[col]]
else:
raise Exception(f'ERROR: The data array or tensor must be either two or three-dimensional. The provided data has {len(data.shape)} dimensions.')
return data
elif denormalization_method.lower() == 'min-max':
if columns_to_denormalize is not False:
mins = df[columns_to_denormalize].min()
maxs = df[columns_to_denormalize].max()
# Check if there are constant features
const_feat = list(mins[mins == maxs].index)
if len(const_feat) > 0:
# Prevent constant features from being denormalized
[columns_to_denormalize.remove(col) for col in const_feat]
mins = mins.drop(const_feat)
maxs = maxs.drop(const_feat)
warnings.warn(f'Found columns {const_feat} to be constant throughout all the data. They should be removed as no insight will be extracted from them.')
if isinstance(df, dd.DataFrame):
# Make sure that the values are computed, in case we're using Dask
mins = means.compute()
maxs = maxs.compute()
# Check if the data being denormalized is directly the dataframe
if data is None:
if not inplace:
# Make a copy of the data to avoid potentially unwanted changes to the original dataframe
if isinstance(data, torch.Tensor):
data = data.clone()
else:
data = data.copy()
else:
# Use the original dataframe
data = df
if columns_to_denormalize is not False:
# Denormalize the right columns
print(f'min-max denormalizing columns {columns_to_denormalize}...')
data[columns_to_denormalize] = data[columns_to_denormalize] * (maxs - mins) + mins
if columns_to_denormalize_categ is not None:
# Make sure that the columns_to_denormalize_categ is a list
if isinstance(columns_to_denormalize_categ, tuple):
columns_to_denormalize_categ = [columns_to_denormalize_categ]
if not isinstance(columns_to_denormalize_categ, list):
raise Exception(f'ERROR: The `columns_to_denormalize_categ` argument must be specified as either a single tuple or a list of tuples. Received input with type {type(columns_to_denormalize_categ)}.')
print(f'min-max denormalizing columns {columns_to_denormalize_categ} by their associated categories...')
for col_tuple in columns_to_denormalize_categ:
categ_columns = col_tuple[0]
column_to_denormalize = col_tuple[1]
# Calculate the minimum and maximum values
mins_grpb = df.groupby(col_tuple[0])[col_tuple[1]].min()
maxs_grpb = df.groupby(col_tuple[0])[col_tuple[1]].max()
if isinstance(df, dd.DataFrame):
# Make sure that the values are computed, in case we're using Dask
mins_grpb = mins_grpb.compute()
maxs_grpb = maxs_grpb.compute()
# Get the categories columns as a numpy array, so as to
# index the groupby-resulting dataframes of minimum and
# maximum values
cat_arr = df[categ_columns].to_numpy()
if isinstance(categ_columns, list) and len(categ_columns) > 1:
# Convert the sets of values into tuples so as to be
# properly readable as dataframe indices
cat_arr = list(map(tuple, cat_arr))
# Get the minimum and maximum values in the same
# order as the original dataframe's row order
mins_cat = mins_grpb.loc[cat_arr].to_numpy()
maxs_cat = maxs_grpb.loc[cat_arr].to_numpy()
# Denormalize the right categories
data[column_to_denormalize] = data[column_to_denormalize] * (maxs_cat - mins_cat) + mins_cat
# Otherwise, the array is denormalized
else:
if not inplace:
# Make a copy of the data to avoid potentially unwanted changes to the original array
data = data.clone()
else:
# Use the original array
data = data
if columns_to_denormalize is not False:
# Dictionaries to retrieve the min and max values
column_mins = dict(mins)
column_maxs = dict(maxs)
# Dictionary to convert the the array's column indices into the dataframe's column names
idx_to_name = dict(enumerate(feature_columns))
# Dictionary to convert the dataframe's column names into the array's column indices
name_to_idx = dict([(t[1], t[0])
for t in enumerate(feature_columns)])
# List of indices of the array's columns which are needing denormalization
array_columns_to_denormalize = [name_to_idx[name] for name in columns_to_denormalize]
# Denormalize the right columns
print(f'min-max denormalizing columns {columns_to_denormalize}...')
for col in utils.iterations_loop(array_columns_to_denormalize, see_progress=see_progress):
if len(data.shape) == 3:
data[:, :, col] = (data[:, :, col] * (column_maxs[idx_to_name[col]] - column_mins[idx_to_name[col]])
+ column_mins[idx_to_name[col]])
elif len(data.shape) == 2:
data[:, col] = (data[:, col] * (column_maxs[idx_to_name[col]] - column_mins[idx_to_name[col]])
+ column_mins[idx_to_name[col]])
else:
raise Exception(f'ERROR: The data array or tensor must be either two or three-dimensional. The provided data has {len(data.shape)} dimensions.')
return data
else:
raise ValueError(f'{denormalization_method} isn\'t a valid denormalization method. Available options \
are "z-score" and "min-max".')
def transpose_dataframe(df, column_to_transpose=None, inplace=False):
'''Transpose a dataframe, either by its original index or through a specific
column, which will be converted to the new column names (i.e. the header).
Parameters
----------
data : pandas.DataFrame or dask.DataFrame
Dataframe that will be transposed.
column_to_transpose : string, default None
If specified, the given column will be used as the new column names, with
its unique values forming the new dataframe's header. Otherwise, the
dataframe will be transposed on its original index.
inplace : bool, default False
If set to True, the original tensor or dataframe will be used and modified
directly. Otherwise, a copy will be created and returned, without
changing the original tensor or dataframe.
Returns
-------
data : pandas.DataFrame or dask.DataFrame
Transposed dataframe.
'''
if not inplace:
# Make a copy of the data to avoid potentially unwanted changes to the original dataframe
data_df = df.copy()
else:
# Use the original dataframe
data_df = df
if column_to_transpose is not None:
# Set as index the column that has the desired column names as values
data_df = data_df.set_index(column_to_transpose)
if isinstance(data_df, pd.DataFrame):
data_df = data_df.transpose()
elif isinstance(data_df, dd.DataFrame):
data_df = (dd.from_pandas(data_df.compute().transpose(),
npartitions=data_df.npartitions))
else:
raise Exception(f'ERROR: The input data must either be a Pandas dataframe or a Dask dataframe, not {type(df)}.')
return data_df
def merge_values(x1, x2, separator=';', str_over_num=True, join_strings=True,
is_bool=False):
'''Merge two values, by extracting the non-missing one, their average value
or the non-numeric one.
Parameters
----------
x1
Value 1 of the merge operation.
x2
Value 2 of the merge operation.
separator : string, default ';'
Symbol that concatenates each string's words, which will be used to join
the inputs if they are both strings.
str_over_num : bool, default True
If set to True, preference will be given to string inputs. Otherwise,
numeric inputs will be prioritized.
join_strings : bool, default True
If set to True, in case of receiving two string inputs, the algorithm
will joined them using the defined separator. Otherwise, the shortest
string will be returned.
is_bool : bool, default False
If set to True, the method will treat the values to merge as boolean
(i.e. it will return either 1, if it's one of the values, or 0).
Returns
-------
x
Resulting merged value.
'''
if is_bool is True:
if (x1 is None or utils.is_num_nan(x1)) and (x2 is None or utils.is_num_nan(x2)):
return 0
elif (x1 is None or utils.is_num_nan(x1)) and not (x2 is None or utils.is_num_nan(x2)):
return x2
elif not (x1 is None or utils.is_num_nan(x1)) and (x2 is None or utils.is_num_nan(x2)):
return x1
else:
return max(x1, x2)
if x1 is None and x2 is not None:
return x2
elif x1 is not None and x2 is None:
return x1
elif x1 == x2:
return x1
elif ((isinstance(x1, float) or isinstance(x1, int))
and (isinstance(x2, float) or isinstance(x2, int))):
# Get the average value between the columns, ignoring NaNs
return np.nanmean([x1, x2])
elif isinstance(x1, str) and isinstance(x2, str):
if not isinstance(separator, str):
raise Exception(f'ERROR: Separator symbol must be in string format, not {type(separator)}.')
if join_strings is True:
# Join strings through the defined separator
return separator.join([x1, x2])
else:
# Return the shortest string
if len(x1) <= len(x2):
return x1
else:
return x2
elif ((isinstance(x1, float) or isinstance(x1, int))
and not (isinstance(x2, float) or isinstance(x2, int))):
if utils.is_num_nan(x1) and not utils.is_num_nan(x2):
# Return the not NaN value
return x2
if str_over_num is True:
# Give preference to string values
return x2
else:
# Give preference to numeric values
return x1
elif not ((isinstance(x1, float) or isinstance(x1, int))
and (isinstance(x2, float) or isinstance(x2, int))):
if utils.is_num_nan(x2) and not utils.is_num_nan(x1):
# Return the not NaN value
return x1
if str_over_num is True:
# Give preference to string values
return x1
else:
# Give preference to numeric values
return x2
else:
warnings.warn(f'Both values are different than NaN and are not numeric. Randomly returning the first value {x1}, instead of {x2}.')
return x1
def merge_columns(df, cols_to_merge=None, drop_old_cols=True, separator=';',
join_strings=False, see_progress=True, inplace=False):
'''Merge columns that have been created, as a consequence of a dataframe
merge operation, resulting in duplicate columns with suffixes.
Parameters
----------
df : pandas.DataFrame or dask.DataFrame
Dataframe that will have its columns merged.
cols_to_merge : string or list of strings, default None
The columns which will be regenerated, by merging its duplicates.
If not specified, the algorithm will search for columns with suffixes.
drop_old_cols : bool, default True
If set to True, the preexisting duplicate columns will be removed.
separator : string, default ';'
Symbol that concatenates each string's words, which will be used to join
the inputs if they are both strings.
join_strings : bool, default False
If set to True, in case of receiving two string inputs, the algorithm
will joined them using the defined separator. Otherwise, the shortest
string will be returned.
see_progress : bool, default True
If set to True, a progress bar will show up indicating the execution
of the normalization calculations.
inplace : bool, default False
If set to True, the original tensor or dataframe will be used and modified
directly. Otherwise, a copy will be created and returned, without
changing the original tensor or dataframe.
Returns
-------
data_df : pandas.DataFrame or dask.DataFrame
Dataframe with the new merged columns.
'''
if not inplace:
# Make a copy of the data to avoid potentially unwanted changes to the original dataframe
data_df = df.copy()
else:
# Use the original dataframe
data_df = df
if cols_to_merge is None:
print('Finding columns to merge...')
# Find all columns that have typical merging suffixes
cols_to_merge = set([col.split('_x')[0].split('_y')[0] for col in df.columns
if col.endswith('_x') or col.endswith('_y')])
# Make sure that the cols_to_merge is a list
if isinstance(cols_to_merge, str):
cols_to_merge = [cols_to_merge]
print('Merging the duplicate columns...')
for col in utils.iterations_loop(cols_to_merge, see_progress=see_progress):
# Check if the columns being merged are boolean
is_bool = all([search_explore.is_boolean_column(data_df, col, n_unique_values=None)]
for col in [f'{col}_x', f'{col}_y'])
# Create a column, with the original name, merging the associated columns' values
data_df[col] = data_df.apply(lambda x: merge_values(x[f'{col}_x'], x[f'{col}_y'],
separator=separator,
join_strings=join_strings,
is_bool=is_bool), axis=1)
if drop_old_cols:
print('Removing old columns...')
# Remove the old columns, with suffixes `_x` and '_y', which resulted
# from the merge of dataframes
for col in utils.iterations_loop(cols_to_merge, see_progress=see_progress):
data_df = data_df.drop(columns=[f'{col}_x', f'{col}_y'])
print('Done!')
return data_df
def missing_values_imputation(data, columns_to_imputate=None, method='zero',
id_column=None, zero_bool=True, reset_index=True,
search_by_dtypes=False, inplace=False):
'''Performs missing values imputation to a tensor or dataframe corresponding to
a single column.
NOTE: Most imputation methods don't work with float16 data types and
interpolation can't be applied to nullable integer types.
Parameters
----------
data : torch.Tensor or pandas.DataFrame or dask.DataFrame
PyTorch tensor corresponding to a single column or a dataframe which will
be imputed.
columns_to_imputate : str or list of str, default None
Specific column(s) to run missing values imputation on. Might be useful
if some columns should be imputated in a specific method, different from
the rest. If left unspecified, all columns will be imputated with the
same method.
method : string, default 'zero'
Imputation method to be used. If user inputs 'zero', it will just fill all
missing values with zero. If the user chooses 'zigzag', it will do a
forward fill, a backward fill and then replace all remaining missing values
with zero (this option is only available for dataframes, not tensors).
If the user selects 'interpolation', missing data will be interpolated based
on known neighboring values and then all possible remaining ones are
replaced with zero (this option is only available for dataframes, not
tensors).
id_column : string, default None
Name of the column which corresponds to the sequence or subject identifier
in the dataframe. If not specified, the imputation will not differenciate
different IDs nor sequences. Only used if the chosen imputation method is
'zigzag' or 'interpolation'.
zero_bool : bool, default True
If set to True, it will look for boolean features and replace their
missing values with zero, regardless of the chosen imputation method.
reset_index : bool, default True
If set to True (recommended), the dataframe's index will be reset. This
can prevent values from being assigned to the wrong rows.
search_by_dtypes : bool, default False
If set to True, the method will only look for boolean columns based on
their data type. This is only reliable if all the columns' data types
have been properly set.
inplace : bool, default False
If set to True, the original tensor or dataframe will be used and modified
directly. Otherwise, a copy will be created and returned, without
changing the original tensor or dataframe.
Returns
-------
tensor : torch.Tensor
Imputed PyTorch tensor.
'''
if ((not isinstance(data, pd.DataFrame))
and (not isinstance(data, dd.DataFrame))
and (not isinstance(data, torch.Tensor))):
raise Exception(f'ERROR: The input data must either be a PyTorch tensor, a Pandas dataframe or a Dask dataframe, not {type(data)}.')
if not inplace:
# Make a copy of the data to avoid potentially unwanted changes to the original data
if isinstance(data, torch.Tensor):
data_copy = data.clone()
else:
data_copy = data.copy()
else:
# Use the original data object
data_copy = data
# [TODO] Implement an option to only imputate specified column(s)
# if columns is None:
# columns = list(data_copy.columns)
if reset_index is True:
# Reset index to avoid assigning values in the wrong rows
print('Resetting the index...')
data_copy.reset_index(drop=True, inplace=True)
if columns_to_imputate is None:
# Imputate all the columns
columns_to_imputate = list(data_copy.columns)
# Make sure that the columns_to_imputate is a list
if isinstance(columns_to_imputate, str):
columns_to_imputate = [columns_to_imputate]
if id_column is not None:
# Make sure that the ID column is in columns_to_imputate
if id_column not in columns_to_imputate:
columns_to_imputate = [id_column] + columns_to_imputate
if zero_bool is True:
# Check if there are boolean features
print('Searching for boolean features...')
bool_feat = search_explore.list_boolean_columns(data_copy, search_by_dtypes=search_by_dtypes)
if len(bool_feat) > 0:
# Fill all boolean features' missing values with zeros
print('Replacing boolean features\' missing values with zero...')
data_copy.loc[:, bool_feat] = data_copy[bool_feat].fillna(value=0)
# Remove the boolean columns from the list of columns to imputate
columns_to_imputate = list(set(columns_to_imputate) - set(bool_feat))
if method.lower() == 'zero':
# Replace NaN's with zeros
print('Replacing missing values with zero...')
if isinstance(data, pd.DataFrame) or isinstance(data, dd.DataFrame):
data_copy.loc[:, columns_to_imputate] = data_copy[columns_to_imputate].fillna(value=0)
elif isinstance(data, torch.Tensor):
# [TODO] Add the ability to specify the tensor columns to imputate
data_copy = torch.where(data_copy != data_copy, torch.zeros_like(data_copy), data_copy)
elif method.lower() == 'zigzag':
if isinstance(data, pd.DataFrame) or isinstance(data, dd.DataFrame):
if id_column is not None:
# Perform imputation on each ID separately
# Forward fill and backward fill
print('Forward filling and backward filling missing values...')
data_copy.loc[:, columns_to_imputate] = data_copy[columns_to_imputate].groupby(id_column).apply(lambda group: group.ffill().bfill())
# Replace remaining missing values with zero
print('Replacing remaining missing values with zero...')
data_copy.loc[:, columns_to_imputate] = data_copy[columns_to_imputate].fillna(value=0)
else:
# Apply imputation on all the data as one single sequence
# Forward fill
print('Forward filling missing values...')
data_copy.loc[:, columns_to_imputate] = data_copy[columns_to_imputate].ffill()
# Backward fill
print('Backward filling missing values...')
data_copy.loc[:, columns_to_imputate] = data_copy[columns_to_imputate].bfill()
# Replace remaining missing values with zero
print('Replacing remaining missing values with zero...')
data_copy.loc[:, columns_to_imputate] = data_copy[columns_to_imputate].fillna(value=0)
elif isinstance(data, torch.Tensor):
raise Exception('ERROR: PyTorch tensors aren\'t supported in the zigzag imputation method. Please use a dataframe instead.')
elif method.lower() == 'interpolation':
if isinstance(data, pd.DataFrame) or isinstance(data, dd.DataFrame):
# Linear interpolation, placing a linear scale between known points and doing simple
# backward and forward fill, when the missing value doesn't have known data points
# before or after, respectively
# NOTE: Since the interpolate method doesn't work on nullable integer data types,
# we need to find and separate columns with that dtype and apply zigzag imputation on them
columns_cant_interpolate = list()
for col in columns_to_imputate:
if (('Int' in str(data[col].dtype) or 'boolean' in str(data[col].dtype))
and col != id_column):
columns_cant_interpolate.append(col)
columns_to_imputate.remove(col)
if id_column is not None:
try:
if len(columns_cant_interpolate) > 0:
# Perform zigzag imputation on columns that can't be interpolated
print('Running zigzag imputation on columns that can\'t be interpolated...')
print(f'(These columns are {columns_cant_interpolate})')
columns_cant_interpolate = [id_column] + columns_cant_interpolate
# Forward fill and backward fill
print('Forward filling and backward filling missing values...')
data_copy.loc[:, columns_cant_interpolate] = data_copy[columns_cant_interpolate].groupby(id_column).apply(lambda group: group.ffill().bfill())
# Replace remaining missing values with zero
print('Replacing remaining missing values with zero...')
data_copy.loc[:, columns_cant_interpolate] = data_copy[columns_cant_interpolate].fillna(value=0)
# There's no need to interpolate if the only column in columns_to_imputate is the ID column
if len(columns_to_imputate) > 1:
# Perform imputation on each ID separately
print('Interpolating missing values...')
data_copy.loc[:, columns_to_imputate] = data_copy[columns_to_imputate].groupby(id_column)[columns_to_imputate].apply(lambda group: group.interpolate(limit_direction='both'))
# Replace remaining missing values with zero
print('Replacing remaining missing values with zero...')
data_copy.loc[:, columns_to_imputate] = data_copy[columns_to_imputate].fillna(value=0)
except ValueError as e:
warnings.warn(f'Initial attempt to interpolate failed. Original exception message: "{str(e)}"\nTrying again after replacing all possible <NA> occurences with a Numpy NaN.')
# Save the current data types
dtype_dict = dict(data_copy.dtypes)
# Replace the '<NA>' objects with NumPy's NaN
data_copy = data_copy.applymap(lambda x: x if not utils.is_num_nan(x) else np.nan)
print('Finished replacing all possible <NA> values.')
# Perform imputation on each ID separately
print('Interpolating missing values...')
data_copy.loc[:, columns_to_imputate] = data_copy[columns_to_imputate].groupby(id_column)[columns_to_imputate].apply(lambda group: group.interpolate(limit_direction='both'))
# Replace remaining missing values with zero
print('Replacing remaining missing values with zero...')
data_copy.loc[:, columns_to_imputate] = data_copy[columns_to_imputate].fillna(value=0)
# Convert the data types back to the original ones
print('Converting data types back to the original ones...')
data_copy = utils.convert_dtypes(data_copy, dtypes=dtype_dict, inplace=True)
else:
try:
if len(columns_cant_interpolate) > 0:
# Perform zigzag imputation on columns that can't be interpolated
print('Running zigzag imputation on columns that can\'t be interpolated...')
print(f'(These columns are {columns_cant_interpolate})')
# Forward fill
print('Forward filling missing values...')
data_copy.loc[:, columns_cant_interpolate] = data_copy[columns_cant_interpolate].ffill()
# Backward fill
print('Backward filling missing values...')
data_copy.loc[:, columns_cant_interpolate] = data_copy[columns_cant_interpolate].bfill()
# Replace remaining missing values with zero
print('Replacing remaining missing values with zero...')
data_copy.loc[:, columns_cant_interpolate] = data_copy[columns_cant_interpolate].fillna(value=0)
# There's no need to interpolate if columns_to_imputate is empty
if len(columns_to_imputate) > 0:
# Apply imputation on all the data as one single sequence
print('Interpolating missing values...')
data_copy.loc[:, columns_to_imputate] = data_copy[columns_to_imputate].interpolate(limit_direction='both')
# Replace remaining missing values with zero
print('Replacing remaining missing values with zero...')
data_copy.loc[:, columns_to_imputate] = data_copy[columns_to_imputate].fillna(value=0)
except ValueError as e:
warnings.warn(f'Initial attempt to interpolate failed. Original exception message: "{str(e)}"\nTrying again after replacing all possible <NA> occurences with a Numpy NaN.')
# Save the current data types
dtype_dict = dict(data_copy.dtypes)
data_copy = utils.convert_dtypes(data_copy, dtypes=dtype_dict, inplace=True)
print('Finished replacing all possible <NA> values.')
# Apply imputation on all the data as one single sequence
print('Interpolating missing values...')
data_copy.loc[:, columns_to_imputate] = data_copy[columns_to_imputate].interpolate(limit_direction='both')
# Replace remaining missing values with zero
print('Replacing remaining missing values with zero...')
data_copy.loc[:, columns_to_imputate] = data_copy[columns_to_imputate].fillna(value=0)
# Convert the data types back to the original ones
print('Converting data types back to the original ones...')
data_copy = utils.convert_dtypes(data_copy, dtypes=dtype_dict, inplace=True)
elif isinstance(data, torch.Tensor):
raise Exception('ERROR: PyTorch tensors aren\'t supported in the interpolation imputation method. Please use a dataframe instead.')
else:
raise Exception(f'ERROR: Unsupported {method} imputation method. Currently available options are `zero` and `zigzag`.')
# [TODO] Add other, more complex imputation methods, like a denoising autoencoder
print('Done!')
return data_copy
def __sep_dosage_units(x):
# Start by assuming that dosage and unit are unknown
dosage = np.nan
unit = np.nan
try:
x = x.split(' ')
if len(x) == 2:
try:
# Add correctly formated dosage
dosage = float(x[0])
except Exception:
pass
try:
if utils.is_definitely_string(x[1]):
# Add correctly formated unit values
unit = x[1]
except Exception:
pass
elif len(x) == 1:
try:
# Try to add correctly formated dosage, even without units
dosage = float(x[0])
except Exception:
pass
except Exception:
try:
# Try to add correctly formated dosage, even without units
dosage = float(x)
except:
pass
return dosage, unit
def set_dosage_and_units(df, orig_column='dosage', new_column_names=['drug_dosage', 'drug_unit']):
'''Separate medication dosage string column into numeric dosage and units
features.
Parameters
----------
df : pandas.DataFrame or dask.DataFrame
Dataframe containing the medication dosage information.
orig_column : string, default 'dosage'
Name of the original column, which will be split in two.
Returns
-------
df : pandas.DataFrame or dask.DataFrame
Dataframe after adding the numeric dosage and units columns.
'''
# Separate the dosage and unit data
dosage_unit_data = df[orig_column].apply(__sep_dosage_units)
# Make sure that the new columns are created
for col in new_column_names:
df[col] = np.nan
# Add the new dosage and units columns
df[new_column_names] = pd.DataFrame(dosage_unit_data.to_numpy().tolist(),
index=dosage_unit_data.index)
return df
def signal_idx_derivative(s, time_scale='seconds', periods=1):
'''Creates a series that contains the signal's index derivative, with the
same divisions (if needed) as the original data and on the desired time
scale.
Parameters
----------
s : pandas.Series or dask.Series
Series which will be analyzed for outlier detection.
time_scale : bool, default 'seconds'
How to calculate derivatives, either with respect to the index values,
on the time scale of 'seconds', 'minutes', 'hours', 'days', 'months' or
'years', or just sequentially, just getting the difference between
consecutive values, 'False'. Only used if parameter 'signal' isn't set
to 'value'.
periods : int, default 1
Defines the steps to take when calculating the derivative. When set to 1,
it performs a normal backwards derivative. When set to 1, it performs a
normal forwards derivative.
Returns
-------
s_idx : pandas.Series or dask.Series
Index derivative signal, on the desired time scale.
'''
# Calculate the signal index's derivative
s_idx = s.index.to_series().diff()
if isinstance(s_idx, dd.DataFrame):
# Make the new derivative have the same divisions as the original signal
s_idx = (s_idx.to_frame().rename(columns={s.index.name:'tmp_val'})
.reset_index()
.set_index(s.index.name, sorted=True, divisions=s.divisions)
.tmp_val)
# Convert derivative to the desired time scale
if time_scale == 'seconds':
s_idx = s_idx.dt.seconds
elif time_scale == 'minutes':
s_idx = s_idx.dt.seconds / 60
elif time_scale == 'hours':
s_idx = s_idx.dt.seconds / 3600
elif time_scale == 'days':
s_idx = s_idx.dt.seconds / 86400
elif time_scale == 'months':
s_idx = s_idx.dt.seconds / 2592000
return s_idx
def threshold_outlier_detect(s, max_thrs=None, min_thrs=None, threshold_type='absolute',
signal_type='value', time_scale='seconds',
derivate_direction='backwards'):
'''Detects outliers based on predetermined thresholds.
Parameters
----------
s : pandas.Series or dask.Series
Series which will be analyzed for outlier detection.
max_thrs : int or float, default None
Maximum threshold, i.e. no normal value can be larger than this
threshold, in the signal (or its n-order derivative) that we're
analyzing.
min_thrs : int or float, default None
Minimum threshold, i.e. no normal value can be smaller than this
threshold, in the signal (or its n-order derivative) that we're
analyzing.
threshold_type : string, default 'absolute'
Determines if we're using threshold values with respect to the original
scale of values, 'absolute', relative to the signal's mean, 'mean' or
'average', to the median, 'median' or to the standard deviation, 'std'.
As such, the possible settings are ['absolute', 'mean', 'average',
'median', 'std'].
signal_type : string, default 'value'
Sets if we're analyzing the original signal value, 'value', its first
derivative, 'derivative' or 'speed', or its second derivative, 'second
derivative' or 'acceleration'. As such, the possible settings are
['value', 'derivative', 'speed', 'second derivative', 'acceleration'].
time_scale : string or bool, default 'seconds'
How to calculate derivatives, either with respect to the index values,
on the time scale of 'seconds', 'minutes', 'hours', 'days', 'months' or
'years', or just sequentially, just getting the difference between
consecutive values, 'False'. Only used if parameter 'signal' isn't set
to 'value'.
derivate_direction : string, default 'backwards'
The direction in which we calculate the derivative, either comparing to
previous values, 'backwards', or to the next values, 'forwards'. As such,
the possible settings are ['backwards', 'forwards']. Only used if
parameter 'signal' isn't set to 'value'.
Returns
-------
outlier_s : pandas.Series or dask.Series
Boolean series indicating where the detected outliers are.
'''
if signal_type.lower() == 'value':
signal = s
elif signal_type.lower() == 'derivative' or signal_type.lower() == 'speed':
if derivate_direction.lower() == 'backwards':
periods = 1
elif derivate_direction.lower() == 'forwards':
periods = -1
else:
raise Exception(f'ERROR: Invalid derivative direction. It must either be "backwards" or "forwards", not {derivate_direction}.')
# Calculate the difference between consecutive values
signal = s.diff(periods)
if time_scale is not None:
# Derivate by the index values
signal = signal / signal_idx_derivative(signal, time_scale, periods)
elif (signal_type.lower() == 'second derivative'
or signal_type.lower() == 'acceleration'):
if derivate_direction.lower() == 'backwards':
periods = 1
elif derivate_direction.lower() == 'forwards':
periods = -1
else:
raise Exception(f'ERROR: Invalid derivative direction. It must either be "backwards" or "forwards", not {derivate_direction}.')
# Calculate the difference between consecutive values
signal = s.diff(periods).diff(periods)
if time_scale is not None:
# Derivate by the index values
signal = signal / signal_idx_derivative(signal, time_scale, periods)
else:
raise Exception('ERROR: Invalid signal type. It must be "value", "derivative", "speed", "second derivative" or "acceleration", not {signal}.')
if threshold_type.lower() == 'absolute':
signal = signal
elif threshold_type.lower() == 'mean' or threshold_type.lower() == 'average':
signal_mean = signal.mean()
if isinstance(signal, dd.DataFrame):
# Make sure that the value is computed, in case we're using Dask
signal_mean = signal_mean.compute()
# Normalize by the average value
signal = signal / signal_mean
elif threshold_type.lower() == 'median':
if isinstance(signal, dd.DataFrame):
# Make sure that the value is computed, in case we're using Dask
signal_median = signal.compute().median()
else:
signal_median = signal.median()
# Normalize by the median value
signal = signal / signal_median
elif threshold_type.lower() == 'std':
signal_mean = signal.mean()
signal_std = signal.std()
if isinstance(signal, dd.DataFrame):
# Make sure that the values are computed, in case we're using Dask
signal_mean = signal_mean.compute()
signal_std = signal_std.compute()
# Normalize by the average and standard deviation values
signal = (signal - signal_mean) / signal_std
else:
raise Exception(f'ERROR: Invalid value type. It must be "absolute", "mean", "average", "median" or "std", not {threshold_type}.')
# Search for outliers based on the given thresholds
if max_thrs is not None and min_thrs is not None:
outlier_s = (signal > max_thrs) | (signal < min_thrs)
elif max_thrs is not None:
outlier_s = signal > max_thrs
elif min_thrs is not None:
outlier_s = signal < min_thrs
else:
raise Exception('ERROR: At least a maximum or a minimum threshold must be set. Otherwise, no outlier will ever be detected.')
return outlier_s
def slopes_outlier_detect(s, max_thrs=4, bidir_sens=0.5, threshold_type='std',
time_scale='seconds', only_bir=False):
'''Detects outliers based on large variations on the signal's derivatives,
either in one direction or on both at the same time.
Parameters
----------
s : pandas.Series or dask.Series
Series which will be analyzed for outlier detection.
max_thrs : int or float
Maximum threshold, i.e. no point can have a magnitude derivative value
deviate more than this threshold, in the signal that we're analyzing.
bidir_sens : float, default 0.5
Dictates how much more sensitive the algorithm is when a deviation (i.e.
large variation) is found on both sides of the data point / both
directions of the derivative. In other words, it's a factor that will be
multiplied by the usual one-directional threshold (`max_thrs`), from which
the resulting value will be used as the bidirectional threshold.
threshold_type : string, default 'std'
Determines if we're using threshold values with respect to the original
scale of derivative values, 'absolute', relative to the derivative's
mean, 'mean' or 'average', to the median, 'median' or to the standard
deviation, 'std'. As such, the possible settings are ['absolute', 'mean',
'average', 'median', 'std'].
time_scale : string or bool, default 'seconds'
How to calculate derivatives, either with respect to the index values,
on the time scale of 'seconds', 'minutes', 'hours', 'days', 'months' or
'years', or just sequentially, just getting the difference between
consecutive values, 'False'. Only used if parameter 'signal' isn't set
to 'value'.
only_bir : bool, default False
If set to True, the algorithm will only check for data points that have
large derivatives on both directions.
Returns
-------
outlier_s : pandas.Series or dask.Series
Boolean series indicating where the detected outliers are.
'''
# Calculate the difference between consecutive values
bckwrds_deriv = s.diff()
frwrds_deriv = s.diff(-1)
if time_scale is not None:
# Derivate by the index values
bckwrds_deriv = bckwrds_deriv / signal_idx_derivative(bckwrds_deriv, time_scale, periods=1)
frwrds_deriv = frwrds_deriv / signal_idx_derivative(frwrds_deriv, time_scale, periods=-1)
if threshold_type.lower() == 'absolute':
bckwrds_deriv = bckwrds_deriv
frwrds_deriv = frwrds_deriv
elif threshold_type.lower() == 'mean' or threshold_type.lower() == 'average':
bckwrds_deriv_mean = bckwrds_deriv.mean()
frwrds_deriv_mean = frwrds_deriv.mean()
if isinstance(bckwrds_deriv, dd.DataFrame):
# Make sure that the value is computed, in case we're using Dask
bckwrds_deriv_mean = bckwrds_deriv_mean.compute()
frwrds_deriv_mean = frwrds_deriv_mean.compute()
# Normalize by the average value
bckwrds_deriv = bckwrds_deriv / bckwrds_deriv_mean
frwrds_deriv = frwrds_deriv / frwrds_deriv_mean
elif threshold_type.lower() == 'median':
bckwrds_deriv_median = bckwrds_deriv.median()
frwrds_deriv_median = frwrds_deriv.median()
if isinstance(bckwrds_deriv, dd.DataFrame):
# Make sure that the value is computed, in case we're using Dask
bckwrds_deriv_median = bckwrds_deriv_median.compute()
frwrds_deriv_median = frwrds_deriv_median.compute()
# Normalize by the median value
bckwrds_deriv = bckwrds_deriv / bckwrds_deriv_median
frwrds_deriv = frwrds_deriv / frwrds_deriv_median
elif threshold_type.lower() == 'std':
bckwrds_deriv_mean = bckwrds_deriv.mean()
frwrds_deriv_mean = frwrds_deriv.mean()
bckwrds_deriv_std = bckwrds_deriv.std()
frwrds_deriv_std = frwrds_deriv.std()
if isinstance(bckwrds_deriv, dd.DataFrame):
# Make sure that the values are computed, in case we're using Dask
bckwrds_deriv_mean = bckwrds_deriv_mean.compute()
frwrds_deriv_mean = frwrds_deriv_mean.compute()
bckwrds_deriv_std = bckwrds_deriv_std.compute()
frwrds_deriv_std = frwrds_deriv_std.compute()
# Normalize by the average and standard deviation values
bckwrds_deriv = (bckwrds_deriv - bckwrds_deriv_mean) / bckwrds_deriv_std
frwrds_deriv = (frwrds_deriv - frwrds_deriv_mean) / frwrds_deriv_std
else:
raise Exception('ERROR: Invalid value type. It must be "absolute", "mean", "average", "median" or "std", not {threshold_type}.')
# Bidirectional threshold, to be used when observing both directions of the derivative
bidir_max = bidir_sens * max_thrs
if only_bir is True:
# Search for outliers on both derivatives at the same time, always on their respective magnitudes
outlier_s = (bckwrds_deriv.abs() > bidir_max) & (frwrds_deriv.abs() > bidir_max)
else:
# Search for outliers on each individual derivative, followed by both at the same time with a lower threshold, always on their respective magnitudes
outlier_s = ((bckwrds_deriv.abs() > max_thrs) | (frwrds_deriv.abs() > max_thrs)
| ((bckwrds_deriv.abs() > bidir_max) & (frwrds_deriv.abs() > bidir_max)))
return outlier_s
def save_chunked_data(df, file_name, n_chunks=None, batch_size=1,
id_column=None, data_path='', format='feather'):
'''Save a dataframe in chunks, i.e. in separate files, so as to prevent
memory issues and other problems when loading it back again.
Parameters
----------
df : pandas.DataFrame or dask.DataFrame
Dataframe which will be saved in chunks.
file_name : str
Name to be given to the file.
n_chunks : int, default None
Number of chunks, i.e. number of files, on which to split and save the
dataframe.
batch_size : int, default 1
Defines the batch size, i.e. the number of samples used in each
training iteration to update the model's weights.
id_column : string, default None
Name of the column which corresponds to the sequence or subject identifier
in the dataframe. If specified, the data will be saved in files
containing a `batch_size` number of unique IDs. This is useful if we're
working with large datasets, which therefore need to be loaded file by
file, lazily, in each training or inference batch.
data_path : str, default ''
Directory path where the file will be stored.
format : str, default 'feather'
Data format used to saved the dataframe. Currently available options are
'feather'.
'''
n_rows = len(df)
format = str(format).lower()
if format == 'feather':
file_ext = '.ftr'
else:
raise Exception(f'ERROR: Invalid data format "{format}". Please choose one of the currently supported formats "feather".')
if n_chunks is not None:
# Total number of rows per file
chunk_size = int(n_rows / n_chunks)
for i in du.utils.iterations_loop(range(n_chunks)):
# Get a chunk of the dataframe
if i < n_chunks-1:
df_i = df.iloc[i*chunk_size:(i+1)*chunk_size]
else:
df_i = df.iloc[i*chunk_size:]
# Reset the index, so as to make it feather compatible
df_i.reset_index(drop=True, inplace=True)
# Save the current dataframe
df_i.to_feather(f'{data_path}{file_name}_{i}{file_ext}')
# Remove the already saved dataframe from memory
del df_i
elif batch_size is not None and id_column is not None:
# List of unique sequence identifiers
ids = list(df[id_column].unique())
# Number of unique IDs
n_ids = len(ids)
# Total number of files to be saved
n_chunks = max(1, math.ceil(n_ids / batch_size))
for i in du.utils.iterations_loop(range(n_chunks)):
# Set the current batch's list of IDs
if i < n_chunks-1:
ids_i = ids[i*batch_size:(i+1)*batch_size]
else:
ids_i = ids[i*batch_size:]
# Get a chunk of the dataframe
df_i = df[df[id_column].isin(ids_i)]
# Reset the index, so as to make it feather compatible
df_i.reset_index(drop=True, inplace=True)
# Save the current dataframe
df_i.to_feather(f'{data_path}{file_name}_{i}{file_ext}')
# Remove the already saved dataframe from memory
del df_i
else:
raise Exception(f'ERROR: Invalid set of input parameters. The user must either specify a number of chunks (`n_chunks`) to save the data or a batch size (`batch_size`) and an ID column (`id_column`) on which to fetch sequences.')
def load_chunked_data(file_name, n_chunks=None, data_path='', format='feather',
dtypes=None, ordered_naming=True):
'''Load a dataframe in chunks, i.e. in separate files, so as to prevent
memory issues and other problems when loading.
Parameters
----------
file_name : str
Name of the file where the dataframe is saved.
n_chunks : int, default None
Number of chunks, i.e. number of files, needed to load the dataframe.
If left unspecified, all the files that match the naming and format will
be loaded.
data_path : str, default ''
Directory path where the file is stored.
format : str, default 'feather'
Data format used to saved the dataframe. Currently available options are
'feather'.
dtypes : dict, default None
Dictionary that indicates the desired dtype for each column.
e.g. {'Var1': 'float64', 'Var2': 'UInt8', 'Var3': str}
ordered_naming : bool, default True
If set to True, the method will load data considering an ordered naming,
staring in 0 until n_chunks. Otherwise, it will search for all files
that have the specified naming and format, even if it uses a different
or irregular numbering.
Returns
-------
df : pandas.DataFrame or dask.DataFrame
Loaded dataframe.
'''
# Validate the file format
format = str(format).lower()
if format == 'feather':
file_ext = '.ftr'
else:
raise Exception(f'ERROR: Invalid data format "{format}". Please choose one of the currently supported formats "feather".')
if n_chunks is None or ordered_naming is False:
# Get a list with the names of the files that can be loaded
data_files = glob(f'{data_path}{file_name}_*{file_ext}')
if n_chunks is None:
# Load all the files, if no limit is specified
n_chunks = len(data_files)
for i in du.utils.iterations_loop(range(n_chunks)):
if i == 0:
# Load the first file
if ordered_naming is True:
df = pd.read_feather(f'{data_path}{file_name}_{i}{file_ext}')
else:
df = pd.read_feather(data_files[i])
if dtypes is not None:
df = du.utils.convert_dtypes(df, dtypes=dtypes, inplace=True)
else:
# Load another file and join it with the already loaded ones
if ordered_naming is True:
tmp_df = pd.read_feather(f'{data_path}{file_name}_{i}{file_ext}')
else:
tmp_df = pd.read_feather(data_files[i])
if dtypes is not None:
tmp_df = du.utils.convert_dtypes(tmp_df, dtypes=dtypes, inplace=True)
df = pd.concat((df, tmp_df))
# Remove the already concatenated dataframe from memory
del tmp_df
return df
Functions
def apply_minmax_denorm(value, df=None, min=None, max=None, categories_mins=None, categories_maxs=None, groupby_columns=None)
-
Performs minmax denormalization when used inside a Pandas or Dask apply function.
Parameters
value
:int
orfloat
- Input normalized value.
df
:pandas.DataFrame
ordask.DataFrame
, defaultNone
- Original pandas dataframe which is used to retrieve the necessary statistical values used in group denormalization, i.e. when values are denormalized according to their corresponding categories.
min
:int
orfloat
, defaultNone
- Minimum value to be used in the minmax denormalization.
max
:int
orfloat
, defaultNone
- Maximum value to be used in the minmax denormalization.
categories_mins
:dict
, defaultNone
- Dictionary containing the minimum values for each set of categories.
categories_maxs
:dict
, defaultNone
- Dictionary containing the maximum values for each set of categories.
groupby_columns
:string
orlist
ofstrings
, defaultNone
- Name(s) of the column(s) that contains the categories from which statistical values (minimum and maximum) are retrieved.
Returns
value_denorm
:int
orfloat
- Minmax denormalized value.
Expand source code
def apply_minmax_denorm(value, df=None, min=None, max=None, categories_mins=None, categories_maxs=None, groupby_columns=None): '''Performs minmax denormalization when used inside a Pandas or Dask apply function. Parameters ---------- value : int or float Input normalized value. df : pandas.DataFrame or dask.DataFrame, default None Original pandas dataframe which is used to retrieve the necessary statistical values used in group denormalization, i.e. when values are denormalized according to their corresponding categories. min : int or float, default None Minimum value to be used in the minmax denormalization. max : int or float, default None Maximum value to be used in the minmax denormalization. categories_mins : dict, default None Dictionary containing the minimum values for each set of categories. categories_maxs : dict, default None Dictionary containing the maximum values for each set of categories. groupby_columns : string or list of strings, default None Name(s) of the column(s) that contains the categories from which statistical values (minimum and maximum) are retrieved. Returns ------- value_denorm : int or float Minmax denormalized value. ''' if not isinstance(value, numbers.Number): raise Exception(f'ERROR: Input value should be a number, not an object of type {type(value)}.') if min is not None and max is not None: return value * (max - min) + min elif (df is not None and categories_mins is not None and categories_maxs is not None and groupby_columns is not None): try: if isinstance(groupby_columns, list): return (value * (categories_maxs[tuple(df[groupby_columns])] - categories_mins[tuple(df[groupby_columns])]) + categories_mins[tuple(df[groupby_columns])]) else: return (value * (categories_maxs[df[groupby_columns]] - categories_mins[df[groupby_columns]]) + categories_mins[df[groupby_columns]]) except Exception: warnings.warn(f'Couldn\'t manage to find the mean and standard deviation values for the groupby columns {groupby_columns} with values {tuple(df[groupby_columns])}.') return np.nan else: raise Exception('ERROR: Invalid parameters. Either the `min` and `max` or the `df`, `categories_mins`, `categories_maxs` and `groupby_columns` must be set.')
def apply_minmax_norm(value, df=None, min=None, max=None, categories_mins=None, categories_maxs=None, groupby_columns=None)
-
Performs minmax normalization when used inside a Pandas or Dask apply function.
Parameters
value
:int
orfloat
- Original, unnormalized value.
df
:pandas.DataFrame
ordask.DataFrame
, defaultNone
- Original pandas dataframe which is used to retrieve the necessary statistical values used in group normalization, i.e. when values are normalized according to their corresponding categories.
min
:int
orfloat
, defaultNone
- Minimum value to be used in the minmax normalization.
max
:int
orfloat
, defaultNone
- Maximum value to be used in the minmax normalization.
categories_mins
:dict
, defaultNone
- Dictionary containing the minimum values for each set of categories.
categories_maxs
:dict
, defaultNone
- Dictionary containing the maximum values for each set of categories.
groupby_columns
:string
orlist
ofstrings
, defaultNone
- Name(s) of the column(s) that contains the categories from which statistical values (minimum and maximum) are retrieved.
Returns
value_norm
:int
orfloat
- Minmax normalized value.
Expand source code
def apply_minmax_norm(value, df=None, min=None, max=None, categories_mins=None, categories_maxs=None, groupby_columns=None): '''Performs minmax normalization when used inside a Pandas or Dask apply function. Parameters ---------- value : int or float Original, unnormalized value. df : pandas.DataFrame or dask.DataFrame, default None Original pandas dataframe which is used to retrieve the necessary statistical values used in group normalization, i.e. when values are normalized according to their corresponding categories. min : int or float, default None Minimum value to be used in the minmax normalization. max : int or float, default None Maximum value to be used in the minmax normalization. categories_mins : dict, default None Dictionary containing the minimum values for each set of categories. categories_maxs : dict, default None Dictionary containing the maximum values for each set of categories. groupby_columns : string or list of strings, default None Name(s) of the column(s) that contains the categories from which statistical values (minimum and maximum) are retrieved. Returns ------- value_norm : int or float Minmax normalized value. ''' if not isinstance(value, numbers.Number): raise Exception(f'ERROR: Input value should be a number, not an object of type {type(value)}.') if min and max: return (value - min) / (max - min) elif df and categories_mins and categories_maxs and groupby_columns: try: if isinstance(groupby_columns, list): return ((value - categories_mins[tuple(df[groupby_columns])]) / (categories_maxs[tuple(df[groupby_columns])] - categories_mins[tuple(df[groupby_columns])])) else: return ((value - categories_mins[df[groupby_columns]]) / (categories_maxs[df[groupby_columns]] - categories_mins[df[groupby_columns]])) except Exception: warnings.warn(f'Couldn\'t manage to find the mean and standard deviation values for the groupby columns {groupby_columns} with values {tuple(df[groupby_columns])}.') return np.nan else: raise Exception('ERROR: Invalid parameters. Either the `min` and `max` or the `df`, `categories_mins`, `categories_maxs` and `groupby_columns` must be set.')
def apply_zscore_denorm(value, df=None, mean=None, std=None, categories_means=None, categories_stds=None, groupby_columns=None)
-
Performs z-score denormalization when used inside a Pandas or Dask apply function.
Parameters
value
:int
orfloat
- Input normalized value.
df
:pandas.DataFrame
ordask.DataFrame
, defaultNone
- Original pandas dataframe which is used to retrieve the necessary statistical values used in group denormalization, i.e. when values are denormalized according to their corresponding categories.
mean
:int
orfloat
, defaultNone
- Average (mean) value to be used in the z-score denormalization.
std
:int
orfloat
, defaultNone
- Standard deviation value to be used in the z-score denormalization.
categories_means
:dict
, defaultNone
- Dictionary containing the average values for each set of categories.
categories_stds
:dict
, defaultNone
- Dictionary containing the standard deviation values for each set of categories.
groupby_columns
:string
orlist
ofstrings
, defaultNone
- Name(s) of the column(s) that contains the categories from which statistical values (mean and standard deviation) are retrieved.
Returns
value_denorm
:int
orfloat
- Z-score denormalized value.
Expand source code
def apply_zscore_denorm(value, df=None, mean=None, std=None, categories_means=None, categories_stds=None, groupby_columns=None): '''Performs z-score denormalization when used inside a Pandas or Dask apply function. Parameters ---------- value : int or float Input normalized value. df : pandas.DataFrame or dask.DataFrame, default None Original pandas dataframe which is used to retrieve the necessary statistical values used in group denormalization, i.e. when values are denormalized according to their corresponding categories. mean : int or float, default None Average (mean) value to be used in the z-score denormalization. std : int or float, default None Standard deviation value to be used in the z-score denormalization. categories_means : dict, default None Dictionary containing the average values for each set of categories. categories_stds : dict, default None Dictionary containing the standard deviation values for each set of categories. groupby_columns : string or list of strings, default None Name(s) of the column(s) that contains the categories from which statistical values (mean and standard deviation) are retrieved. Returns ------- value_denorm : int or float Z-score denormalized value. ''' if not isinstance(value, numbers.Number): raise Exception(f'ERROR: Input value should be a number, not an object of type {type(value)}.') if mean is not None and std is not None: return value * std + mean elif (df is not None and categories_means is not None and categories_stds is not None and groupby_columns is not None): try: if isinstance(groupby_columns, list): return (value * categories_stds[tuple(df[groupby_columns])] + categories_means[tuple(df[groupby_columns])]) else: return (value * categories_stds[df[groupby_columns]] + categories_means[df[groupby_columns]]) except Exception: warnings.warn(f'Couldn\'t manage to find the mean and standard deviation values for the groupby columns {groupby_columns} with values {tuple(df[groupby_columns])}.') return np.nan else: raise Exception('ERROR: Invalid parameters. Either the `mean` and `std` or the `df`, `categories_means`, `categories_stds` and `groupby_columns` must be set.')
def apply_zscore_norm(value, df=None, mean=None, std=None, categories_means=None, categories_stds=None, groupby_columns=None)
-
Performs z-score normalization when used inside a Pandas or Dask apply function.
Parameters
value
:int
orfloat
- Original, unnormalized value.
df
:pandas.DataFrame
ordask.DataFrame
, defaultNone
- Original pandas dataframe which is used to retrieve the necessary statistical values used in group normalization, i.e. when values are normalized according to their corresponding categories.
mean
:int
orfloat
, defaultNone
- Average (mean) value to be used in the z-score normalization.
std
:int
orfloat
, defaultNone
- Standard deviation value to be used in the z-score normalization.
categories_means
:dict
, defaultNone
- Dictionary containing the average values for each set of categories.
categories_stds
:dict
, defaultNone
- Dictionary containing the standard deviation values for each set of categories.
groupby_columns
:string
orlist
ofstrings
, defaultNone
- Name(s) of the column(s) that contains the categories from which statistical values (mean and standard deviation) are retrieved.
Returns
value_norm
:int
orfloat
- Z-score normalized value.
Expand source code
def apply_zscore_norm(value, df=None, mean=None, std=None, categories_means=None, categories_stds=None, groupby_columns=None): '''Performs z-score normalization when used inside a Pandas or Dask apply function. Parameters ---------- value : int or float Original, unnormalized value. df : pandas.DataFrame or dask.DataFrame, default None Original pandas dataframe which is used to retrieve the necessary statistical values used in group normalization, i.e. when values are normalized according to their corresponding categories. mean : int or float, default None Average (mean) value to be used in the z-score normalization. std : int or float, default None Standard deviation value to be used in the z-score normalization. categories_means : dict, default None Dictionary containing the average values for each set of categories. categories_stds : dict, default None Dictionary containing the standard deviation values for each set of categories. groupby_columns : string or list of strings, default None Name(s) of the column(s) that contains the categories from which statistical values (mean and standard deviation) are retrieved. Returns ------- value_norm : int or float Z-score normalized value. ''' if not isinstance(value, numbers.Number): raise Exception(f'ERROR: Input value should be a number, not an object of type {type(value)}.') if mean is not None and std is not None: return (value - mean) / std elif (df is not None and categories_means is not None and categories_stds is not None and groupby_columns is not None): try: if isinstance(groupby_columns, list): return ((value - categories_means[tuple(df[groupby_columns])]) / categories_stds[tuple(df[groupby_columns])]) else: return ((value - categories_means[df[groupby_columns]]) / categories_stds[df[groupby_columns]]) except Exception: warnings.warn(f'Couldn\'t manage to find the mean and standard deviation values for the groupby columns {groupby_columns} with values {tuple(df[groupby_columns])}.') return np.nan else: raise Exception('ERROR: Invalid parameters. Either the `mean` and `std` or the `df`, `categories_means`, `categories_stds` and `groupby_columns` must be set.')
def category_to_feature(df, categories_feature, values_feature, min_len=None, see_progress=True, inplace=False)
-
Convert a categorical column and its corresponding values column into new features, one for each category. WARNING: Currently not working properly on a Dask dataframe. Apply .compute() to the dataframe to convert it to Pandas, before passing it to this method. If the data is too big to run on Pandas, use the category_to_feature_big_data method.
Parameters
df
:pandas.DataFrame
ordask.DataFrame
- Dataframe on which to add the new features.
categories_feature
:string
- Name of the feature that contains the categories that will be converted to individual features.
values_feature
:string
- Name of the feature that has each category's corresponding value, which may or may not be a category on its own (e.g. it could be numeric values).
min_len
:int
, defaultNone
- If defined, only the categories that appear on at least
min_len
rows are converted to features. see_progress
:bool
, defaultTrue
- If set to True, a progress bar will show up indicating the execution of the normalization calculations.
inplace
:bool
, defaultFalse
- If set to True, the original dataframe will be used and modified directly. Otherwise, a copy will be created and returned, without changing the original dataframe.
Returns
data_df
:pandas.DataFrame
ordask.DataFrame
- Dataframe with the newly created features.
Expand source code
def category_to_feature(df, categories_feature, values_feature, min_len=None, see_progress=True, inplace=False): '''Convert a categorical column and its corresponding values column into new features, one for each category. WARNING: Currently not working properly on a Dask dataframe. Apply .compute() to the dataframe to convert it to Pandas, before passing it to this method. If the data is too big to run on Pandas, use the category_to_feature_big_data method. Parameters ---------- df : pandas.DataFrame or dask.DataFrame Dataframe on which to add the new features. categories_feature : string Name of the feature that contains the categories that will be converted to individual features. values_feature : string Name of the feature that has each category's corresponding value, which may or may not be a category on its own (e.g. it could be numeric values). min_len : int, default None If defined, only the categories that appear on at least `min_len` rows are converted to features. see_progress : bool, default True If set to True, a progress bar will show up indicating the execution of the normalization calculations. inplace : bool, default False If set to True, the original dataframe will be used and modified directly. Otherwise, a copy will be created and returned, without changing the original dataframe. Returns ------- data_df : pandas.DataFrame or dask.DataFrame Dataframe with the newly created features. ''' if not inplace: # Make a copy of the data to avoid potentially unwanted changes to the original dataframe data_df = df.copy() else: # Use the original dataframe data_df = df # Find the unique categories categories = data_df[categories_feature].unique() if isinstance(df, dd.DataFrame): categories = categories.compute() # Create a feature for each category for category in utils.iterations_loop(categories, see_progress=see_progress): if min_len is not None: # Check if the current category has enough data to be worth it to convert to a feature if len(data_df[data_df[categories_feature] == category]) < min_len: # Ignore the current category continue # Convert category to feature data_df[category] = data_df.apply(lambda x: x[values_feature] if x[categories_feature] == category else np.nan, axis=1) return data_df
def category_to_feature_big_data(df, categories_feature, values_feature, min_len=None, see_progress=True)
-
Convert a categorical column and its corresponding values column into new features, one for each category. Optimized for very big Dask dataframes, which can't be processed as a whole Pandas dataframe.
Parameters
df
:dask.DataFrame
- Dataframe on which to add the new features.
categories_feature
:string
- Name of the feature that contains the categories that will be converted to individual features.
values_feature
:string
- Name of the feature that has each category's corresponding value, which may or may not be a category on its own (e.g. it could be numeric values).
min_len
:int
, defaultNone
- If defined, only the categories that appear on at least
min_len
rows are converted to features. see_progress
:bool
, defaultTrue
- If set to True, a progress bar will show up indicating the execution of the normalization calculations.
Returns
data_df
:dask.DataFrame
- Dataframe with the newly created features.
Expand source code
def category_to_feature_big_data(df, categories_feature, values_feature, min_len=None, see_progress=True): '''Convert a categorical column and its corresponding values column into new features, one for each category. Optimized for very big Dask dataframes, which can't be processed as a whole Pandas dataframe. Parameters ---------- df : dask.DataFrame Dataframe on which to add the new features. categories_feature : string Name of the feature that contains the categories that will be converted to individual features. values_feature : string Name of the feature that has each category's corresponding value, which may or may not be a category on its own (e.g. it could be numeric values). min_len : int, default None If defined, only the categories that appear on at least `min_len` rows are converted to features. see_progress : bool, default True If set to True, a progress bar will show up indicating the execution of the normalization calculations. Returns ------- data_df : dask.DataFrame Dataframe with the newly created features. ''' # Create a list with Pandas dataframe versions of each partition of the # original Dask dataframe df_list = [] print('Converting categories to features in each partition...') for n in utils.iterations_loop(range(df.npartitions), see_progress=see_progress): # Process each partition separately in Pandas tmp_df = df.get_partition(n).compute() tmp_df = category_to_feature(tmp_df, categories_feature=categories_feature, values_feature=values_feature, min_len=min_len, see_progress=see_progress) df_list.append(tmp_df) # Rejoin all the partitions into a Dask dataframe with the same number of # partitions it originally had print('Rejoining partitions into a Dask dataframe...') data_df = dd.from_pandas(pd.concat(df_list, sort=False), npartitions=df.npartitions) print('Done!') return data_df
def clean_categories_naming(df, column, clean_missing_values=True, specific_nan_strings=[], lower_case=False)
-
Change categorical values to only have lower case letters and underscores.
Parameters
df
:pandas.DataFrame
ordask.DataFrame
- Dataframe that contains the column to be cleaned.
column
:string
- Name of the dataframe's column which needs to have its string values standardized.
clean_missing_values
:bool
, defaultTrue
- If set to True, the algorithm will search for missing value representations and replace them with the standard, NumPy NaN value.
specific_nan_strings
:list
ofstrings
, default[]
- Parameter where the user can specify additional strings that should correspond to missing values.
lower_case
:bool
, defaultFalse
- If set to True, all strings will be converted to lower case.
Returns
df
:pandas.DataFrame
ordask.DataFrame
- Dataframe with its string column already cleaned.
Expand source code
def clean_categories_naming(df, column, clean_missing_values=True, specific_nan_strings=[], lower_case=False): '''Change categorical values to only have lower case letters and underscores. Parameters ---------- df : pandas.DataFrame or dask.DataFrame Dataframe that contains the column to be cleaned. column : string Name of the dataframe's column which needs to have its string values standardized. clean_missing_values : bool, default True If set to True, the algorithm will search for missing value representations and replace them with the standard, NumPy NaN value. specific_nan_strings : list of strings, default [] Parameter where the user can specify additional strings that should correspond to missing values. lower_case : bool, default False If set to True, all strings will be converted to lower case. Returns ------- df : pandas.DataFrame or dask.DataFrame Dataframe with its string column already cleaned. ''' # Fix the seeting of all lower case characters according to the `lower_case` parameter clean_naming_prtl = partial(clean_naming, lower_case=lower_case) if isinstance(df, dd.DataFrame): df[column] = (df[column].map(clean_naming_prtl, meta=('x', str))) if clean_missing_values is True: df[column] = df[column].apply(lambda x: standardize_missing_values(x, specific_nan_strings), meta=df[column]._meta.dtypes) else: df[column] = (df[column].map(clean_naming_prtl)) if clean_missing_values is True: df[column] = df[column].apply(lambda x: standardize_missing_values(x, specific_nan_strings)) return df
def clean_naming(x, lower_case=True)
-
Change strings to only have lower case letters and underscores.
Parameters
x
:string
orlist
ofstrings
- String(s) on which to clean the naming, standardizing it.
lower_case
:bool
, defaultTrue
- If set to True, all strings will be converted to lower case.
Returns
x
:string
orlist
ofstrings
- Cleaned string(s).
Expand source code
def clean_naming(x, lower_case=True): '''Change strings to only have lower case letters and underscores. Parameters ---------- x : string or list of strings String(s) on which to clean the naming, standardizing it. lower_case : bool, default True If set to True, all strings will be converted to lower case. Returns ------- x : string or list of strings Cleaned string(s). ''' if 'pandas.core.indexes.base.Index' in str(type(x)): # If the user input is a dataframe index (e.g. df.columns), convert it to a list x = list(x) if isinstance(x, list): if lower_case is True: x = [string.lower().replace(' ', '') .replace(' ', '_') .replace(',', '_and') for string in x] else: x = [string.replace(' ', '') .replace(' ', '_') .replace(',', '_and') for string in x] elif (isinstance(x, pd.DataFrame) or isinstance(x, pd.Series) or isinstance(x, dd.DataFrame) or isinstance(x, dd.Series)): raise Exception('ERROR: Wrong method. When using dataframes or series, use clean_categories_naming() method instead.') else: if lower_case is True: x = (str(x).lower().replace(' ', '') .replace(' ', '_') .replace(',', '_and')) else: x = (str(x).replace(' ', '') .replace(' ', '_') .replace(',', '_and')) return x
def denormalize_data(df=None, data=None, id_columns=['patientunitstayid', 'ts'], denormalization_method='z-score', columns_to_denormalize=None, columns_to_denormalize_categ=None, categ_columns=None, see_progress=True, search_by_dtypes=False, inplace=False, means=None, stds=None, mins=None, maxs=None, feature_columns=None)
-
Performs data denormalization to a continuous valued tensor or dataframe, changing the scale of the data.
Parameters
df
:pandas.DataFrame
ordask.DataFrame
, defaultNone
- Original Pandas or Dask dataframe which is used to correctly calculate the necessary statistical values used in the denormalization. These values can't be calculated from the tensor as it might have been padded. If the data tensor isn't specified, the denormalization is applied directly on the dataframe.
data
:torch.Tensor
ornumpy.Array
, defaultNone
- PyTorch tensor or NumPy array corresponding to the data which will be denormalized by the specified denormalization method. If the data isn't specified, the denormalization is applied directly on the dataframe.
id_columns
:string
orlist
ofstrings
, default['subject_id', 'ts']
- List of columns names which represent identifier columns. These are not supposed to be denormalized.
denormalization_method
:string
, default'z-score'
- Specifies the denormalization method used. It can be a z-score denormalization, where the data is subtracted of its mean and divided by the standard deviation, which makes it have zero average and unit variance, much like a standard normal distribution; it can be a min-max denormalization, where the data is subtracted by its minimum value and then divided by the difference between the minimum and the maximum value, getting to a fixed range from 0 to 1.
columns_to_denormalize
:string
orlist
ofstrings
, defaultNone
- If specified, the columns provided in the list are the only ones that
will be denormalized. If set to False, no column will be denormalized directly,
although columns can still be denormalized in groups of categories, if
specified in the
columns_to_denormalize_categ
parameter. Otherwise, all continuous columns will be denormalized. columns_to_denormalize_categ
:tuple
orlist
oftuples
oftuples
, defaultNone
- If specified, the columns provided in the list are going to be denormalized on their categories. That is, the values (column 2 in the tuple) are denormalized with stats of their respective categories (column 1 of the tuple). Otherwise, no column will be denormalized on their categories.
categ_columns
:string
orlist
ofstrings
, defaultNone
- If specified, the columns in the list, which represent categorical features, which either are a label or will be embedded, aren't going to be denormalized.
see_progress
:bool
, defaultTrue
- If set to True, a progress bar will show up indicating the execution of the denormalization calculations.
search_by_dtypes
:bool
, defaultFalse
- If set to True, the method will only look for boolean columns based on their data type. This is only reliable if all the columns' data types have been properly set.
inplace
:bool
, defaultFalse
- If set to True, the original dataframe will be used and modified directly. Otherwise, a copy will be created and returned, without changing the original dataframe.
Returns
data
:pandas.DataFrame
ordask.DataFrame
ortorch.Tensor
- Denormalized Pandas or Dask dataframe or PyTorch tensor.
Expand source code
def denormalize_data(df=None, data=None, id_columns=['patientunitstayid', 'ts'], denormalization_method='z-score', columns_to_denormalize=None, columns_to_denormalize_categ=None, categ_columns=None, see_progress=True, search_by_dtypes=False, inplace=False, means=None, stds=None, mins=None, maxs=None, feature_columns=None): '''Performs data denormalization to a continuous valued tensor or dataframe, changing the scale of the data. Parameters ---------- df : pandas.DataFrame or dask.DataFrame, default None Original Pandas or Dask dataframe which is used to correctly calculate the necessary statistical values used in the denormalization. These values can't be calculated from the tensor as it might have been padded. If the data tensor isn't specified, the denormalization is applied directly on the dataframe. data : torch.Tensor or numpy.Array, default None PyTorch tensor or NumPy array corresponding to the data which will be denormalized by the specified denormalization method. If the data isn't specified, the denormalization is applied directly on the dataframe. id_columns : string or list of strings, default ['subject_id', 'ts'] List of columns names which represent identifier columns. These are not supposed to be denormalized. denormalization_method : string, default 'z-score' Specifies the denormalization method used. It can be a z-score denormalization, where the data is subtracted of its mean and divided by the standard deviation, which makes it have zero average and unit variance, much like a standard normal distribution; it can be a min-max denormalization, where the data is subtracted by its minimum value and then divided by the difference between the minimum and the maximum value, getting to a fixed range from 0 to 1. columns_to_denormalize : string or list of strings, default None If specified, the columns provided in the list are the only ones that will be denormalized. If set to False, no column will be denormalized directly, although columns can still be denormalized in groups of categories, if specified in the `columns_to_denormalize_categ` parameter. Otherwise, all continuous columns will be denormalized. columns_to_denormalize_categ : tuple or list of tuples of tuples, default None If specified, the columns provided in the list are going to be denormalized on their categories. That is, the values (column 2 in the tuple) are denormalized with stats of their respective categories (column 1 of the tuple). Otherwise, no column will be denormalized on their categories. categ_columns : string or list of strings, default None If specified, the columns in the list, which represent categorical features, which either are a label or will be embedded, aren't going to be denormalized. see_progress : bool, default True If set to True, a progress bar will show up indicating the execution of the denormalization calculations. search_by_dtypes : bool, default False If set to True, the method will only look for boolean columns based on their data type. This is only reliable if all the columns' data types have been properly set. inplace : bool, default False If set to True, the original dataframe will be used and modified directly. Otherwise, a copy will be created and returned, without changing the original dataframe. Returns ------- data : pandas.DataFrame or dask.DataFrame or torch.Tensor Denormalized Pandas or Dask dataframe or PyTorch tensor. ''' # [TODO] Add the option in denormalize_data to denormalize a data tensor # using a norm_stats dictionary instead of fetching the denormalization # stats from the original dataframe if feature_columns is None and df is not None: # List of all columns in the dataframe feature_columns = list(df.columns) # Check if specific columns have been specified for denormalization if columns_to_denormalize is None: # Denormalize all non identifier continuous columns, ignore one hot encoded ones columns_to_denormalize = feature_columns.copy() if id_columns is not None: # Make sure that the id_columns is a list if isinstance(id_columns, str): id_columns = [id_columns] if not isinstance(id_columns, list): raise Exception(f'ERROR: The `id_columns` argument must be specified as either a single string or a list of strings. Received input with type {type(id_columns)}.') # List of all columns in the dataframe, except the ID columns [columns_to_denormalize.remove(col) for col in id_columns] if categ_columns is not None: # Make sure that the categ_columns is a list if isinstance(categ_columns, str): categ_columns = [categ_columns] if not isinstance(categ_columns, list): raise Exception(f'ERROR: The `categ_columns` argument must be specified as either a single string or a list of strings. Received input with type {type(categ_columns)}.') # Prevent all features that will be embedded from being denormalized [columns_to_denormalize.remove(col) for col in categ_columns] # List of boolean or one hot encoded columns boolean_cols = search_explore.list_boolean_columns(df[columns_to_denormalize], search_by_dtypes=search_by_dtypes) if boolean_cols is not None: # Prevent boolean features from being denormalized [columns_to_denormalize.remove(col) for col in boolean_cols] # Remove all non numeric columns that could be left columns_to_denormalize = [col for col in columns_to_denormalize if df[col].dtype == int or df[col].dtype == float] if columns_to_denormalize is None: print('No columns to denormalize, returning the original dataframe.') return df # Make sure that the columns_to_denormalize is a list if isinstance(columns_to_denormalize, str): columns_to_denormalize = [columns_to_denormalize] if not isinstance(columns_to_denormalize, list) and not isinstance(columns_to_denormalize, bool): raise Exception(f'ERROR: The `columns_to_denormalize` argument must be specified as either a single string, a list of strings or a boolean. Received input with type {type(columns_to_denormalize)}.') if type(denormalization_method) is not str: raise ValueError('Argument denormalization_method should be a string. Available options are "z-score" and "min-max".') if denormalization_method.lower() == 'z-score': if columns_to_denormalize is not False: # Calculate the means and standard deviations if means is None: means = df[columns_to_denormalize].mean() if stds is None: stds = df[columns_to_denormalize].std() # Check if there are constant features if isinstance(stds, pd.Series): const_feat = list(stds[stds == 0].index) elif isinstance(stds, dict): const_feat = [feat for feat in stds.keys() if stds[feat] == 0] if len(const_feat) > 0: # Prevent constant features from being denormalized [columns_to_denormalize.remove(col) for col in const_feat] means = means.drop(const_feat) stds = stds.drop(const_feat) warnings.warn(f'Found columns {const_feat} to be constant throughout all the data. They should be removed as no insight will be extracted from them.') if isinstance(df, dd.DataFrame): # Make sure that the values are computed, in case we're using Dask means = means.compute() stds = stds.compute() # Check if the data being denormalized is directly the dataframe if data is None: if not inplace: # Make a copy of the data to avoid potentially unwanted changes to the original dataframe data = df.copy() else: # Use the original dataframe data = df # Denormalize the right columns if columns_to_denormalize is not False: print(f'z-score denormalizing columns {columns_to_denormalize}...') data[columns_to_denormalize] = data[columns_to_denormalize] * stds + means if columns_to_denormalize_categ is not None: # Make sure that the columns_to_denormalize_categ is a list if isinstance(columns_to_denormalize_categ, tuple): columns_to_denormalize_categ = [columns_to_denormalize_categ] if not isinstance(columns_to_denormalize_categ, list): raise Exception(f'ERROR: The `columns_to_denormalize_categ` argument must be specified as either a single tuple or a list of tuples. Received input with type {type(columns_to_denormalize_categ)}.') print(f'z-score denormalizing columns {columns_to_denormalize_categ} by their associated categories...') for col_tuple in utils.iterations_loop(columns_to_denormalize_categ, see_progress=see_progress): categ_columns = col_tuple[0] column_to_denormalize = col_tuple[1] # Calculate the means and standard deviations means_grpb = df.groupby(categ_columns)[ column_to_denormalize].mean() stds_grpb = df.groupby(categ_columns)[ column_to_denormalize].std() if isinstance(df, dd.DataFrame): # Make sure that the values are computed, in case we're using Dask means_grpb = means.compute() stds_grpb = stds.compute() # Get the categories columns as a numpy array, so as to # index the groupby-resulting dataframes of mean and standard # deviation values cat_arr = df[categ_columns].to_numpy() if isinstance(categ_columns, list) and len(categ_columns) > 1: # Convert the sets of values into tuples so as to be # properly readable as dataframe indices cat_arr = list(map(tuple, cat_arr)) # Get the mean and standard deviation values in the same # order as the original dataframe's row order means_cat = means_grpb.loc[cat_arr].to_numpy() stds_cat = stds_grpb.loc[cat_arr].to_numpy() # Denormalize the right categories data[column_to_denormalize] = data[column_to_denormalize] * stds_cat + means_cat # Otherwise, the array is denormalized else: if not inplace: # Make a copy of the data to avoid potentially unwanted changes to the original array if isinstance(data, torch.Tensor): data = data.clone() else: data = data.copy() else: # Use the original array data = data if columns_to_denormalize is not False: # Dictionaries to retrieve the mean and standard deviation values if not isinstance(means, dict): means = dict(means) if not isinstance(stds, dict): stds = dict(stds) # Dictionary to convert the the array's column indices into the dataframe's column names idx_to_name = dict(enumerate(feature_columns)) # Dictionary to convert the dataframe's column names into the array's column indices name_to_idx = dict([(t[1], t[0]) for t in enumerate(feature_columns)]) # List of indices of the array's columns which are needing denormalization array_columns_to_denormalize = [name_to_idx[name] for name in columns_to_denormalize] # Denormalize the right columns print(f'z-score denormalizing columns {columns_to_denormalize}...') for col in utils.iterations_loop(array_columns_to_denormalize, see_progress=see_progress): if len(data.shape) == 3: data[:, :, col] = data[:, :, col] * stds[idx_to_name[col]] + means[idx_to_name[col]] elif len(data.shape) == 2: data[:, col] = data[:, col] * stds[idx_to_name[col]] + means[idx_to_name[col]] else: raise Exception(f'ERROR: The data array or tensor must be either two or three-dimensional. The provided data has {len(data.shape)} dimensions.') return data elif denormalization_method.lower() == 'min-max': if columns_to_denormalize is not False: mins = df[columns_to_denormalize].min() maxs = df[columns_to_denormalize].max() # Check if there are constant features const_feat = list(mins[mins == maxs].index) if len(const_feat) > 0: # Prevent constant features from being denormalized [columns_to_denormalize.remove(col) for col in const_feat] mins = mins.drop(const_feat) maxs = maxs.drop(const_feat) warnings.warn(f'Found columns {const_feat} to be constant throughout all the data. They should be removed as no insight will be extracted from them.') if isinstance(df, dd.DataFrame): # Make sure that the values are computed, in case we're using Dask mins = means.compute() maxs = maxs.compute() # Check if the data being denormalized is directly the dataframe if data is None: if not inplace: # Make a copy of the data to avoid potentially unwanted changes to the original dataframe if isinstance(data, torch.Tensor): data = data.clone() else: data = data.copy() else: # Use the original dataframe data = df if columns_to_denormalize is not False: # Denormalize the right columns print(f'min-max denormalizing columns {columns_to_denormalize}...') data[columns_to_denormalize] = data[columns_to_denormalize] * (maxs - mins) + mins if columns_to_denormalize_categ is not None: # Make sure that the columns_to_denormalize_categ is a list if isinstance(columns_to_denormalize_categ, tuple): columns_to_denormalize_categ = [columns_to_denormalize_categ] if not isinstance(columns_to_denormalize_categ, list): raise Exception(f'ERROR: The `columns_to_denormalize_categ` argument must be specified as either a single tuple or a list of tuples. Received input with type {type(columns_to_denormalize_categ)}.') print(f'min-max denormalizing columns {columns_to_denormalize_categ} by their associated categories...') for col_tuple in columns_to_denormalize_categ: categ_columns = col_tuple[0] column_to_denormalize = col_tuple[1] # Calculate the minimum and maximum values mins_grpb = df.groupby(col_tuple[0])[col_tuple[1]].min() maxs_grpb = df.groupby(col_tuple[0])[col_tuple[1]].max() if isinstance(df, dd.DataFrame): # Make sure that the values are computed, in case we're using Dask mins_grpb = mins_grpb.compute() maxs_grpb = maxs_grpb.compute() # Get the categories columns as a numpy array, so as to # index the groupby-resulting dataframes of minimum and # maximum values cat_arr = df[categ_columns].to_numpy() if isinstance(categ_columns, list) and len(categ_columns) > 1: # Convert the sets of values into tuples so as to be # properly readable as dataframe indices cat_arr = list(map(tuple, cat_arr)) # Get the minimum and maximum values in the same # order as the original dataframe's row order mins_cat = mins_grpb.loc[cat_arr].to_numpy() maxs_cat = maxs_grpb.loc[cat_arr].to_numpy() # Denormalize the right categories data[column_to_denormalize] = data[column_to_denormalize] * (maxs_cat - mins_cat) + mins_cat # Otherwise, the array is denormalized else: if not inplace: # Make a copy of the data to avoid potentially unwanted changes to the original array data = data.clone() else: # Use the original array data = data if columns_to_denormalize is not False: # Dictionaries to retrieve the min and max values column_mins = dict(mins) column_maxs = dict(maxs) # Dictionary to convert the the array's column indices into the dataframe's column names idx_to_name = dict(enumerate(feature_columns)) # Dictionary to convert the dataframe's column names into the array's column indices name_to_idx = dict([(t[1], t[0]) for t in enumerate(feature_columns)]) # List of indices of the array's columns which are needing denormalization array_columns_to_denormalize = [name_to_idx[name] for name in columns_to_denormalize] # Denormalize the right columns print(f'min-max denormalizing columns {columns_to_denormalize}...') for col in utils.iterations_loop(array_columns_to_denormalize, see_progress=see_progress): if len(data.shape) == 3: data[:, :, col] = (data[:, :, col] * (column_maxs[idx_to_name[col]] - column_mins[idx_to_name[col]]) + column_mins[idx_to_name[col]]) elif len(data.shape) == 2: data[:, col] = (data[:, col] * (column_maxs[idx_to_name[col]] - column_mins[idx_to_name[col]]) + column_mins[idx_to_name[col]]) else: raise Exception(f'ERROR: The data array or tensor must be either two or three-dimensional. The provided data has {len(data.shape)} dimensions.') return data else: raise ValueError(f'{denormalization_method} isn\'t a valid denormalization method. Available options \ are "z-score" and "min-max".')
def get_clean_label(orig_label, clean_labels, column_name=None)
-
Gets the clean version of a given label.
Parameters
orig_label
:string
- Original label name that needs to be converted to the new format.
clean_labels
:dict
- Dictionary that converts each original label into a new, cleaner designation.
column_name
:string
, defaultNone
- Optional parameter to indicate a column name, which is used to specify better the missing values.
Returns
key
:string
- Returns the dictionary key from clean_labels that corresponds to the translation given to the input label orig_label.
Expand source code
def get_clean_label(orig_label, clean_labels, column_name=None): '''Gets the clean version of a given label. Parameters ---------- orig_label : string Original label name that needs to be converted to the new format. clean_labels : dict Dictionary that converts each original label into a new, cleaner designation. column_name : string, default None Optional parameter to indicate a column name, which is used to specify better the missing values. Returns ------- key : string Returns the dictionary key from clean_labels that corresponds to the translation given to the input label orig_label. ''' for key in clean_labels: if orig_label in clean_labels[key]: return key # Remaining labels (or lack of one) are considered as missing data if column_name is not None: return f'{column_name}_missing_value' else: return 'missing_value'
def load_chunked_data(file_name, n_chunks=None, data_path='', format='feather', dtypes=None, ordered_naming=True)
-
Load a dataframe in chunks, i.e. in separate files, so as to prevent memory issues and other problems when loading.
Parameters
file_name
:str
- Name of the file where the dataframe is saved.
n_chunks
:int
, defaultNone
- Number of chunks, i.e. number of files, needed to load the dataframe. If left unspecified, all the files that match the naming and format will be loaded.
data_path
:str
, default''
- Directory path where the file is stored.
format
:str
, default'feather'
- Data format used to saved the dataframe. Currently available options are 'feather'.
dtypes
:dict
, defaultNone
- Dictionary that indicates the desired dtype for each column. e.g. {'Var1': 'float64', 'Var2': 'UInt8', 'Var3': str}
ordered_naming
:bool
, defaultTrue
- If set to True, the method will load data considering an ordered naming, staring in 0 until n_chunks. Otherwise, it will search for all files that have the specified naming and format, even if it uses a different or irregular numbering.
Returns
df
:pandas.DataFrame
ordask.DataFrame
- Loaded dataframe.
Expand source code
def load_chunked_data(file_name, n_chunks=None, data_path='', format='feather', dtypes=None, ordered_naming=True): '''Load a dataframe in chunks, i.e. in separate files, so as to prevent memory issues and other problems when loading. Parameters ---------- file_name : str Name of the file where the dataframe is saved. n_chunks : int, default None Number of chunks, i.e. number of files, needed to load the dataframe. If left unspecified, all the files that match the naming and format will be loaded. data_path : str, default '' Directory path where the file is stored. format : str, default 'feather' Data format used to saved the dataframe. Currently available options are 'feather'. dtypes : dict, default None Dictionary that indicates the desired dtype for each column. e.g. {'Var1': 'float64', 'Var2': 'UInt8', 'Var3': str} ordered_naming : bool, default True If set to True, the method will load data considering an ordered naming, staring in 0 until n_chunks. Otherwise, it will search for all files that have the specified naming and format, even if it uses a different or irregular numbering. Returns ------- df : pandas.DataFrame or dask.DataFrame Loaded dataframe. ''' # Validate the file format format = str(format).lower() if format == 'feather': file_ext = '.ftr' else: raise Exception(f'ERROR: Invalid data format "{format}". Please choose one of the currently supported formats "feather".') if n_chunks is None or ordered_naming is False: # Get a list with the names of the files that can be loaded data_files = glob(f'{data_path}{file_name}_*{file_ext}') if n_chunks is None: # Load all the files, if no limit is specified n_chunks = len(data_files) for i in du.utils.iterations_loop(range(n_chunks)): if i == 0: # Load the first file if ordered_naming is True: df = pd.read_feather(f'{data_path}{file_name}_{i}{file_ext}') else: df = pd.read_feather(data_files[i]) if dtypes is not None: df = du.utils.convert_dtypes(df, dtypes=dtypes, inplace=True) else: # Load another file and join it with the already loaded ones if ordered_naming is True: tmp_df = pd.read_feather(f'{data_path}{file_name}_{i}{file_ext}') else: tmp_df = pd.read_feather(data_files[i]) if dtypes is not None: tmp_df = du.utils.convert_dtypes(tmp_df, dtypes=dtypes, inplace=True) df = pd.concat((df, tmp_df)) # Remove the already concatenated dataframe from memory del tmp_df return df
def merge_columns(df, cols_to_merge=None, drop_old_cols=True, separator=';', join_strings=False, see_progress=True, inplace=False)
-
Merge columns that have been created, as a consequence of a dataframe merge operation, resulting in duplicate columns with suffixes.
Parameters
df
:pandas.DataFrame
ordask.DataFrame
- Dataframe that will have its columns merged.
cols_to_merge
:string
orlist
ofstrings
, defaultNone
- The columns which will be regenerated, by merging its duplicates. If not specified, the algorithm will search for columns with suffixes.
drop_old_cols
:bool
, defaultTrue
- If set to True, the preexisting duplicate columns will be removed.
separator
:string
, default';'
- Symbol that concatenates each string's words, which will be used to join the inputs if they are both strings.
join_strings
:bool
, defaultFalse
- If set to True, in case of receiving two string inputs, the algorithm will joined them using the defined separator. Otherwise, the shortest string will be returned.
see_progress
:bool
, defaultTrue
- If set to True, a progress bar will show up indicating the execution of the normalization calculations.
inplace
:bool
, defaultFalse
- If set to True, the original tensor or dataframe will be used and modified directly. Otherwise, a copy will be created and returned, without changing the original tensor or dataframe.
Returns
data_df
:pandas.DataFrame
ordask.DataFrame
- Dataframe with the new merged columns.
Expand source code
def merge_columns(df, cols_to_merge=None, drop_old_cols=True, separator=';', join_strings=False, see_progress=True, inplace=False): '''Merge columns that have been created, as a consequence of a dataframe merge operation, resulting in duplicate columns with suffixes. Parameters ---------- df : pandas.DataFrame or dask.DataFrame Dataframe that will have its columns merged. cols_to_merge : string or list of strings, default None The columns which will be regenerated, by merging its duplicates. If not specified, the algorithm will search for columns with suffixes. drop_old_cols : bool, default True If set to True, the preexisting duplicate columns will be removed. separator : string, default ';' Symbol that concatenates each string's words, which will be used to join the inputs if they are both strings. join_strings : bool, default False If set to True, in case of receiving two string inputs, the algorithm will joined them using the defined separator. Otherwise, the shortest string will be returned. see_progress : bool, default True If set to True, a progress bar will show up indicating the execution of the normalization calculations. inplace : bool, default False If set to True, the original tensor or dataframe will be used and modified directly. Otherwise, a copy will be created and returned, without changing the original tensor or dataframe. Returns ------- data_df : pandas.DataFrame or dask.DataFrame Dataframe with the new merged columns. ''' if not inplace: # Make a copy of the data to avoid potentially unwanted changes to the original dataframe data_df = df.copy() else: # Use the original dataframe data_df = df if cols_to_merge is None: print('Finding columns to merge...') # Find all columns that have typical merging suffixes cols_to_merge = set([col.split('_x')[0].split('_y')[0] for col in df.columns if col.endswith('_x') or col.endswith('_y')]) # Make sure that the cols_to_merge is a list if isinstance(cols_to_merge, str): cols_to_merge = [cols_to_merge] print('Merging the duplicate columns...') for col in utils.iterations_loop(cols_to_merge, see_progress=see_progress): # Check if the columns being merged are boolean is_bool = all([search_explore.is_boolean_column(data_df, col, n_unique_values=None)] for col in [f'{col}_x', f'{col}_y']) # Create a column, with the original name, merging the associated columns' values data_df[col] = data_df.apply(lambda x: merge_values(x[f'{col}_x'], x[f'{col}_y'], separator=separator, join_strings=join_strings, is_bool=is_bool), axis=1) if drop_old_cols: print('Removing old columns...') # Remove the old columns, with suffixes `_x` and '_y', which resulted # from the merge of dataframes for col in utils.iterations_loop(cols_to_merge, see_progress=see_progress): data_df = data_df.drop(columns=[f'{col}_x', f'{col}_y']) print('Done!') return data_df
def merge_values(x1, x2, separator=';', str_over_num=True, join_strings=True, is_bool=False)
-
Merge two values, by extracting the non-missing one, their average value or the non-numeric one.
Parameters
x1
- Value 1 of the merge operation.
x2
- Value 2 of the merge operation.
separator
:string
, default';'
- Symbol that concatenates each string's words, which will be used to join the inputs if they are both strings.
str_over_num
:bool
, defaultTrue
- If set to True, preference will be given to string inputs. Otherwise, numeric inputs will be prioritized.
join_strings
:bool
, defaultTrue
- If set to True, in case of receiving two string inputs, the algorithm will joined them using the defined separator. Otherwise, the shortest string will be returned.
is_bool
:bool
, defaultFalse
- If set to True, the method will treat the values to merge as boolean (i.e. it will return either 1, if it's one of the values, or 0).
Returns
x
- Resulting merged value.
Expand source code
def merge_values(x1, x2, separator=';', str_over_num=True, join_strings=True, is_bool=False): '''Merge two values, by extracting the non-missing one, their average value or the non-numeric one. Parameters ---------- x1 Value 1 of the merge operation. x2 Value 2 of the merge operation. separator : string, default ';' Symbol that concatenates each string's words, which will be used to join the inputs if they are both strings. str_over_num : bool, default True If set to True, preference will be given to string inputs. Otherwise, numeric inputs will be prioritized. join_strings : bool, default True If set to True, in case of receiving two string inputs, the algorithm will joined them using the defined separator. Otherwise, the shortest string will be returned. is_bool : bool, default False If set to True, the method will treat the values to merge as boolean (i.e. it will return either 1, if it's one of the values, or 0). Returns ------- x Resulting merged value. ''' if is_bool is True: if (x1 is None or utils.is_num_nan(x1)) and (x2 is None or utils.is_num_nan(x2)): return 0 elif (x1 is None or utils.is_num_nan(x1)) and not (x2 is None or utils.is_num_nan(x2)): return x2 elif not (x1 is None or utils.is_num_nan(x1)) and (x2 is None or utils.is_num_nan(x2)): return x1 else: return max(x1, x2) if x1 is None and x2 is not None: return x2 elif x1 is not None and x2 is None: return x1 elif x1 == x2: return x1 elif ((isinstance(x1, float) or isinstance(x1, int)) and (isinstance(x2, float) or isinstance(x2, int))): # Get the average value between the columns, ignoring NaNs return np.nanmean([x1, x2]) elif isinstance(x1, str) and isinstance(x2, str): if not isinstance(separator, str): raise Exception(f'ERROR: Separator symbol must be in string format, not {type(separator)}.') if join_strings is True: # Join strings through the defined separator return separator.join([x1, x2]) else: # Return the shortest string if len(x1) <= len(x2): return x1 else: return x2 elif ((isinstance(x1, float) or isinstance(x1, int)) and not (isinstance(x2, float) or isinstance(x2, int))): if utils.is_num_nan(x1) and not utils.is_num_nan(x2): # Return the not NaN value return x2 if str_over_num is True: # Give preference to string values return x2 else: # Give preference to numeric values return x1 elif not ((isinstance(x1, float) or isinstance(x1, int)) and (isinstance(x2, float) or isinstance(x2, int))): if utils.is_num_nan(x2) and not utils.is_num_nan(x1): # Return the not NaN value return x1 if str_over_num is True: # Give preference to string values return x1 else: # Give preference to numeric values return x2 else: warnings.warn(f'Both values are different than NaN and are not numeric. Randomly returning the first value {x1}, instead of {x2}.') return x1
def missing_values_imputation(data, columns_to_imputate=None, method='zero', id_column=None, zero_bool=True, reset_index=True, search_by_dtypes=False, inplace=False)
-
Performs missing values imputation to a tensor or dataframe corresponding to a single column. NOTE: Most imputation methods don't work with float16 data types and interpolation can't be applied to nullable integer types.
Parameters
data
:torch.Tensor
orpandas.DataFrame
ordask.DataFrame
- PyTorch tensor corresponding to a single column or a dataframe which will be imputed.
columns_to_imputate
:str
orlist
ofstr
, defaultNone
- Specific column(s) to run missing values imputation on. Might be useful if some columns should be imputated in a specific method, different from the rest. If left unspecified, all columns will be imputated with the same method.
method
:string
, default'zero'
- Imputation method to be used. If user inputs 'zero', it will just fill all missing values with zero. If the user chooses 'zigzag', it will do a forward fill, a backward fill and then replace all remaining missing values with zero (this option is only available for dataframes, not tensors). If the user selects 'interpolation', missing data will be interpolated based on known neighboring values and then all possible remaining ones are replaced with zero (this option is only available for dataframes, not tensors).
id_column
:string
, defaultNone
- Name of the column which corresponds to the sequence or subject identifier in the dataframe. If not specified, the imputation will not differenciate different IDs nor sequences. Only used if the chosen imputation method is 'zigzag' or 'interpolation'.
zero_bool
:bool
, defaultTrue
- If set to True, it will look for boolean features and replace their missing values with zero, regardless of the chosen imputation method.
reset_index
:bool
, defaultTrue
- If set to True (recommended), the dataframe's index will be reset. This can prevent values from being assigned to the wrong rows.
search_by_dtypes
:bool
, defaultFalse
- If set to True, the method will only look for boolean columns based on their data type. This is only reliable if all the columns' data types have been properly set.
inplace
:bool
, defaultFalse
- If set to True, the original tensor or dataframe will be used and modified directly. Otherwise, a copy will be created and returned, without changing the original tensor or dataframe.
Returns
tensor
:torch.Tensor
- Imputed PyTorch tensor.
Expand source code
def missing_values_imputation(data, columns_to_imputate=None, method='zero', id_column=None, zero_bool=True, reset_index=True, search_by_dtypes=False, inplace=False): '''Performs missing values imputation to a tensor or dataframe corresponding to a single column. NOTE: Most imputation methods don't work with float16 data types and interpolation can't be applied to nullable integer types. Parameters ---------- data : torch.Tensor or pandas.DataFrame or dask.DataFrame PyTorch tensor corresponding to a single column or a dataframe which will be imputed. columns_to_imputate : str or list of str, default None Specific column(s) to run missing values imputation on. Might be useful if some columns should be imputated in a specific method, different from the rest. If left unspecified, all columns will be imputated with the same method. method : string, default 'zero' Imputation method to be used. If user inputs 'zero', it will just fill all missing values with zero. If the user chooses 'zigzag', it will do a forward fill, a backward fill and then replace all remaining missing values with zero (this option is only available for dataframes, not tensors). If the user selects 'interpolation', missing data will be interpolated based on known neighboring values and then all possible remaining ones are replaced with zero (this option is only available for dataframes, not tensors). id_column : string, default None Name of the column which corresponds to the sequence or subject identifier in the dataframe. If not specified, the imputation will not differenciate different IDs nor sequences. Only used if the chosen imputation method is 'zigzag' or 'interpolation'. zero_bool : bool, default True If set to True, it will look for boolean features and replace their missing values with zero, regardless of the chosen imputation method. reset_index : bool, default True If set to True (recommended), the dataframe's index will be reset. This can prevent values from being assigned to the wrong rows. search_by_dtypes : bool, default False If set to True, the method will only look for boolean columns based on their data type. This is only reliable if all the columns' data types have been properly set. inplace : bool, default False If set to True, the original tensor or dataframe will be used and modified directly. Otherwise, a copy will be created and returned, without changing the original tensor or dataframe. Returns ------- tensor : torch.Tensor Imputed PyTorch tensor. ''' if ((not isinstance(data, pd.DataFrame)) and (not isinstance(data, dd.DataFrame)) and (not isinstance(data, torch.Tensor))): raise Exception(f'ERROR: The input data must either be a PyTorch tensor, a Pandas dataframe or a Dask dataframe, not {type(data)}.') if not inplace: # Make a copy of the data to avoid potentially unwanted changes to the original data if isinstance(data, torch.Tensor): data_copy = data.clone() else: data_copy = data.copy() else: # Use the original data object data_copy = data # [TODO] Implement an option to only imputate specified column(s) # if columns is None: # columns = list(data_copy.columns) if reset_index is True: # Reset index to avoid assigning values in the wrong rows print('Resetting the index...') data_copy.reset_index(drop=True, inplace=True) if columns_to_imputate is None: # Imputate all the columns columns_to_imputate = list(data_copy.columns) # Make sure that the columns_to_imputate is a list if isinstance(columns_to_imputate, str): columns_to_imputate = [columns_to_imputate] if id_column is not None: # Make sure that the ID column is in columns_to_imputate if id_column not in columns_to_imputate: columns_to_imputate = [id_column] + columns_to_imputate if zero_bool is True: # Check if there are boolean features print('Searching for boolean features...') bool_feat = search_explore.list_boolean_columns(data_copy, search_by_dtypes=search_by_dtypes) if len(bool_feat) > 0: # Fill all boolean features' missing values with zeros print('Replacing boolean features\' missing values with zero...') data_copy.loc[:, bool_feat] = data_copy[bool_feat].fillna(value=0) # Remove the boolean columns from the list of columns to imputate columns_to_imputate = list(set(columns_to_imputate) - set(bool_feat)) if method.lower() == 'zero': # Replace NaN's with zeros print('Replacing missing values with zero...') if isinstance(data, pd.DataFrame) or isinstance(data, dd.DataFrame): data_copy.loc[:, columns_to_imputate] = data_copy[columns_to_imputate].fillna(value=0) elif isinstance(data, torch.Tensor): # [TODO] Add the ability to specify the tensor columns to imputate data_copy = torch.where(data_copy != data_copy, torch.zeros_like(data_copy), data_copy) elif method.lower() == 'zigzag': if isinstance(data, pd.DataFrame) or isinstance(data, dd.DataFrame): if id_column is not None: # Perform imputation on each ID separately # Forward fill and backward fill print('Forward filling and backward filling missing values...') data_copy.loc[:, columns_to_imputate] = data_copy[columns_to_imputate].groupby(id_column).apply(lambda group: group.ffill().bfill()) # Replace remaining missing values with zero print('Replacing remaining missing values with zero...') data_copy.loc[:, columns_to_imputate] = data_copy[columns_to_imputate].fillna(value=0) else: # Apply imputation on all the data as one single sequence # Forward fill print('Forward filling missing values...') data_copy.loc[:, columns_to_imputate] = data_copy[columns_to_imputate].ffill() # Backward fill print('Backward filling missing values...') data_copy.loc[:, columns_to_imputate] = data_copy[columns_to_imputate].bfill() # Replace remaining missing values with zero print('Replacing remaining missing values with zero...') data_copy.loc[:, columns_to_imputate] = data_copy[columns_to_imputate].fillna(value=0) elif isinstance(data, torch.Tensor): raise Exception('ERROR: PyTorch tensors aren\'t supported in the zigzag imputation method. Please use a dataframe instead.') elif method.lower() == 'interpolation': if isinstance(data, pd.DataFrame) or isinstance(data, dd.DataFrame): # Linear interpolation, placing a linear scale between known points and doing simple # backward and forward fill, when the missing value doesn't have known data points # before or after, respectively # NOTE: Since the interpolate method doesn't work on nullable integer data types, # we need to find and separate columns with that dtype and apply zigzag imputation on them columns_cant_interpolate = list() for col in columns_to_imputate: if (('Int' in str(data[col].dtype) or 'boolean' in str(data[col].dtype)) and col != id_column): columns_cant_interpolate.append(col) columns_to_imputate.remove(col) if id_column is not None: try: if len(columns_cant_interpolate) > 0: # Perform zigzag imputation on columns that can't be interpolated print('Running zigzag imputation on columns that can\'t be interpolated...') print(f'(These columns are {columns_cant_interpolate})') columns_cant_interpolate = [id_column] + columns_cant_interpolate # Forward fill and backward fill print('Forward filling and backward filling missing values...') data_copy.loc[:, columns_cant_interpolate] = data_copy[columns_cant_interpolate].groupby(id_column).apply(lambda group: group.ffill().bfill()) # Replace remaining missing values with zero print('Replacing remaining missing values with zero...') data_copy.loc[:, columns_cant_interpolate] = data_copy[columns_cant_interpolate].fillna(value=0) # There's no need to interpolate if the only column in columns_to_imputate is the ID column if len(columns_to_imputate) > 1: # Perform imputation on each ID separately print('Interpolating missing values...') data_copy.loc[:, columns_to_imputate] = data_copy[columns_to_imputate].groupby(id_column)[columns_to_imputate].apply(lambda group: group.interpolate(limit_direction='both')) # Replace remaining missing values with zero print('Replacing remaining missing values with zero...') data_copy.loc[:, columns_to_imputate] = data_copy[columns_to_imputate].fillna(value=0) except ValueError as e: warnings.warn(f'Initial attempt to interpolate failed. Original exception message: "{str(e)}"\nTrying again after replacing all possible <NA> occurences with a Numpy NaN.') # Save the current data types dtype_dict = dict(data_copy.dtypes) # Replace the '<NA>' objects with NumPy's NaN data_copy = data_copy.applymap(lambda x: x if not utils.is_num_nan(x) else np.nan) print('Finished replacing all possible <NA> values.') # Perform imputation on each ID separately print('Interpolating missing values...') data_copy.loc[:, columns_to_imputate] = data_copy[columns_to_imputate].groupby(id_column)[columns_to_imputate].apply(lambda group: group.interpolate(limit_direction='both')) # Replace remaining missing values with zero print('Replacing remaining missing values with zero...') data_copy.loc[:, columns_to_imputate] = data_copy[columns_to_imputate].fillna(value=0) # Convert the data types back to the original ones print('Converting data types back to the original ones...') data_copy = utils.convert_dtypes(data_copy, dtypes=dtype_dict, inplace=True) else: try: if len(columns_cant_interpolate) > 0: # Perform zigzag imputation on columns that can't be interpolated print('Running zigzag imputation on columns that can\'t be interpolated...') print(f'(These columns are {columns_cant_interpolate})') # Forward fill print('Forward filling missing values...') data_copy.loc[:, columns_cant_interpolate] = data_copy[columns_cant_interpolate].ffill() # Backward fill print('Backward filling missing values...') data_copy.loc[:, columns_cant_interpolate] = data_copy[columns_cant_interpolate].bfill() # Replace remaining missing values with zero print('Replacing remaining missing values with zero...') data_copy.loc[:, columns_cant_interpolate] = data_copy[columns_cant_interpolate].fillna(value=0) # There's no need to interpolate if columns_to_imputate is empty if len(columns_to_imputate) > 0: # Apply imputation on all the data as one single sequence print('Interpolating missing values...') data_copy.loc[:, columns_to_imputate] = data_copy[columns_to_imputate].interpolate(limit_direction='both') # Replace remaining missing values with zero print('Replacing remaining missing values with zero...') data_copy.loc[:, columns_to_imputate] = data_copy[columns_to_imputate].fillna(value=0) except ValueError as e: warnings.warn(f'Initial attempt to interpolate failed. Original exception message: "{str(e)}"\nTrying again after replacing all possible <NA> occurences with a Numpy NaN.') # Save the current data types dtype_dict = dict(data_copy.dtypes) data_copy = utils.convert_dtypes(data_copy, dtypes=dtype_dict, inplace=True) print('Finished replacing all possible <NA> values.') # Apply imputation on all the data as one single sequence print('Interpolating missing values...') data_copy.loc[:, columns_to_imputate] = data_copy[columns_to_imputate].interpolate(limit_direction='both') # Replace remaining missing values with zero print('Replacing remaining missing values with zero...') data_copy.loc[:, columns_to_imputate] = data_copy[columns_to_imputate].fillna(value=0) # Convert the data types back to the original ones print('Converting data types back to the original ones...') data_copy = utils.convert_dtypes(data_copy, dtypes=dtype_dict, inplace=True) elif isinstance(data, torch.Tensor): raise Exception('ERROR: PyTorch tensors aren\'t supported in the interpolation imputation method. Please use a dataframe instead.') else: raise Exception(f'ERROR: Unsupported {method} imputation method. Currently available options are `zero` and `zigzag`.') # [TODO] Add other, more complex imputation methods, like a denoising autoencoder print('Done!') return data_copy
def normalize_data(df, data=None, id_columns=['patientunitstayid', 'ts'], normalization_method='z-score', columns_to_normalize=None, columns_to_normalize_categ=None, categ_columns=None, see_progress=True, get_stats=False, search_by_dtypes=False, inplace=False)
-
Performs data normalization to a continuous valued tensor or dataframe, changing the scale of the data.
Parameters
df
:pandas.DataFrame
ordask.DataFrame
- Original Pandas or Dask dataframe which is used to correctly calculate the necessary statistical values used in the normalization. These values can't be calculated from the tensor as it might have been padded. If the data tensor isn't specified, the normalization is applied directly on the dataframe.
data
:torch.Tensor
, defaultNone
- PyTorch tensor corresponding to the data which will be normalized by the specified normalization method. If the data tensor isn't specified, the normalization is applied directly on the dataframe.
id_columns
:string
orlist
ofstrings
, default['subject_id', 'ts']
- List of columns names which represent identifier columns. These are not supposed to be normalized.
normalization_method
:string
, default'z-score'
- Specifies the normalization method used. It can be a z-score normalization, where the data is subtracted of its mean and divided by the standard deviation, which makes it have zero average and unit variance, much like a standard normal distribution; it can be a min-max normalization, where the data is subtracted by its minimum value and then divided by the difference between the minimum and the maximum value, getting to a fixed range from 0 to 1.
columns_to_normalize
:string
orlist
ofstrings
, defaultNone
- If specified, the columns provided in the list are the only ones that
will be normalized. If set to False, no column will be normalized directly,
although columns can still be normalized in groups of categories, if
specified in the
columns_to_normalize_categ
parameter. Otherwise, all continuous columns will be normalized. columns_to_normalize_categ
:tuple
orlist
oftuples
oftuples
, defaultNone
- If specified, the columns provided in the list are going to be normalized on their categories. That is, the values (column 2 in the tuple) are normalized with stats of their respective categories (column 1 of the tuple). Otherwise, no column will be normalized on their categories.
categ_columns
:string
orlist
ofstrings
, defaultNone
- If specified, the columns in the list, which represent categorical features, which either are a label or will be embedded, aren't going to be normalized.
see_progress
:bool
, defaultTrue
- If set to True, a progress bar will show up indicating the execution of the normalization calculations.
get_stats
:bool
, defaultFalse
- If set to True, the stats used to normalize the data (e.g. mean and standard deviation) are also outputed.
search_by_dtypes
:bool
, defaultFalse
- If set to True, the method will only look for boolean columns based on their data type. This is only reliable if all the columns' data types have been properly set.
inplace
:bool
, defaultFalse
- If set to True, the original dataframe will be used and modified directly. Otherwise, a copy will be created and returned, without changing the original dataframe.
Returns
data
:pandas.DataFrame
ordask.DataFrame
ortorch.Tensor
- Normalized Pandas or Dask dataframe or PyTorch tensor.
If get_stats == True and normalization_method == 'z-score':
mean
:float
ordict
orlist
offloats
orlist
ofdicts
- Mean value(s) used in the data normalization.
std
:float
ordict
orlist
offloats
orlist
ofdicts
- Standard deviation value(s) used in the data normalization.
If get_stats == True and normalization_method == 'min-max':
min
:dict
- Minimum value(s) used in the data normalization.
max
:dict
- Maximum value(s) used in the data normalization.
Expand source code
def normalize_data(df, data=None, id_columns=['patientunitstayid', 'ts'], normalization_method='z-score', columns_to_normalize=None, columns_to_normalize_categ=None, categ_columns=None, see_progress=True, get_stats=False, search_by_dtypes=False, inplace=False): '''Performs data normalization to a continuous valued tensor or dataframe, changing the scale of the data. Parameters ---------- df : pandas.DataFrame or dask.DataFrame Original Pandas or Dask dataframe which is used to correctly calculate the necessary statistical values used in the normalization. These values can't be calculated from the tensor as it might have been padded. If the data tensor isn't specified, the normalization is applied directly on the dataframe. data : torch.Tensor, default None PyTorch tensor corresponding to the data which will be normalized by the specified normalization method. If the data tensor isn't specified, the normalization is applied directly on the dataframe. id_columns : string or list of strings, default ['subject_id', 'ts'] List of columns names which represent identifier columns. These are not supposed to be normalized. normalization_method : string, default 'z-score' Specifies the normalization method used. It can be a z-score normalization, where the data is subtracted of its mean and divided by the standard deviation, which makes it have zero average and unit variance, much like a standard normal distribution; it can be a min-max normalization, where the data is subtracted by its minimum value and then divided by the difference between the minimum and the maximum value, getting to a fixed range from 0 to 1. columns_to_normalize : string or list of strings, default None If specified, the columns provided in the list are the only ones that will be normalized. If set to False, no column will be normalized directly, although columns can still be normalized in groups of categories, if specified in the `columns_to_normalize_categ` parameter. Otherwise, all continuous columns will be normalized. columns_to_normalize_categ : tuple or list of tuples of tuples, default None If specified, the columns provided in the list are going to be normalized on their categories. That is, the values (column 2 in the tuple) are normalized with stats of their respective categories (column 1 of the tuple). Otherwise, no column will be normalized on their categories. categ_columns : string or list of strings, default None If specified, the columns in the list, which represent categorical features, which either are a label or will be embedded, aren't going to be normalized. see_progress : bool, default True If set to True, a progress bar will show up indicating the execution of the normalization calculations. get_stats : bool, default False If set to True, the stats used to normalize the data (e.g. mean and standard deviation) are also outputed. search_by_dtypes : bool, default False If set to True, the method will only look for boolean columns based on their data type. This is only reliable if all the columns' data types have been properly set. inplace : bool, default False If set to True, the original dataframe will be used and modified directly. Otherwise, a copy will be created and returned, without changing the original dataframe. Returns ------- data : pandas.DataFrame or dask.DataFrame or torch.Tensor Normalized Pandas or Dask dataframe or PyTorch tensor. If get_stats == True and normalization_method == 'z-score': mean : float or dict or list of floats or list of dicts Mean value(s) used in the data normalization. std : float or dict or list of floats or list of dicts Standard deviation value(s) used in the data normalization. If get_stats == True and normalization_method == 'min-max': min : dict Minimum value(s) used in the data normalization. max : dict Maximum value(s) used in the data normalization. ''' # Check if specific columns have been specified for normalization if columns_to_normalize is None: # List of all columns in the dataframe feature_columns = list(df.columns) # Normalize all non identifier continuous columns, ignore one hot encoded ones columns_to_normalize = feature_columns if id_columns is not None: # Make sure that the id_columns is a list if isinstance(id_columns, str): id_columns = [id_columns] if not isinstance(id_columns, list): raise Exception(f'ERROR: The `id_columns` argument must be specified as either a single string or a list of strings. Received input with type {type(id_columns)}.') # List of all columns in the dataframe, except the ID columns [columns_to_normalize.remove(col) for col in id_columns] if categ_columns is not None: # Make sure that the categ_columns is a list if isinstance(categ_columns, str): categ_columns = [categ_columns] if not isinstance(categ_columns, list): raise Exception(f'ERROR: The `categ_columns` argument must be specified as either a single string or a list of strings. Received input with type {type(categ_columns)}.') # Prevent all features that will be embedded from being normalized [columns_to_normalize.remove(col) for col in categ_columns] # List of boolean or one hot encoded columns boolean_cols = search_explore.list_boolean_columns(df[columns_to_normalize], search_by_dtypes=search_by_dtypes) if boolean_cols is not None: # Prevent boolean features from being normalized [columns_to_normalize.remove(col) for col in boolean_cols] # Remove all non numeric columns that could be left columns_to_normalize = [col for col in columns_to_normalize if df[col].dtype == int or df[col].dtype == float] if columns_to_normalize is None: print('No columns to normalize, returning the original dataframe.') return df # Make sure that the columns_to_normalize is a list if isinstance(columns_to_normalize, str): columns_to_normalize = [columns_to_normalize] if not isinstance(columns_to_normalize, list) and not isinstance(columns_to_normalize, bool): raise Exception(f'ERROR: The `columns_to_normalize` argument must be specified as either a single string, a list of strings or a boolean. Received input with type {type(columns_to_normalize)}.') if type(normalization_method) is not str: raise ValueError('Argument normalization_method should be a string. Available options are "z-score" and "min-max".') if normalization_method.lower() == 'z-score': if columns_to_normalize is not False: # Calculate the means and standard deviations means = df[columns_to_normalize].mean() stds = df[columns_to_normalize].std() # Check if there are constant features const_feat = list(stds[stds == 0].index) if len(const_feat) > 0: # Prevent constant features from being normalized [columns_to_normalize.remove(col) for col in const_feat] means = means.drop(const_feat) stds = stds.drop(const_feat) warnings.warn(f'Found columns {const_feat} to be constant throughout all the data. They should be removed as no insight will be extracted from them.') if isinstance(df, dd.DataFrame): # Make sure that the values are computed, in case we're using Dask means = means.compute() stds = stds.compute() # Check if the data being normalized is directly the dataframe if data is None: if not inplace: # Make a copy of the data to avoid potentially unwanted changes to the original dataframe data = df.copy() else: # Use the original dataframe data = df # Normalize the right columns if columns_to_normalize is not False: print(f'z-score normalizing columns {columns_to_normalize}...') data[columns_to_normalize] = (data[columns_to_normalize] - means) / stds if columns_to_normalize_categ is not None: if get_stats is True: mean_list = [] std_list = [] # Make sure that the columns_to_normalize_categ is a list if isinstance(columns_to_normalize_categ, tuple): columns_to_normalize_categ = [columns_to_normalize_categ] if not isinstance(columns_to_normalize_categ, list): raise Exception(f'ERROR: The `columns_to_normalize_categ` argument must be specified as either a single tuple or a list of tuples. Received input with type {type(columns_to_normalize_categ)}.') print(f'z-score normalizing columns {columns_to_normalize_categ} by their associated categories...') for col_tuple in utils.iterations_loop(columns_to_normalize_categ, see_progress=see_progress): categ_columns = col_tuple[0] column_to_normalize = col_tuple[1] # Calculate the means and standard deviations means_grpb = df.groupby(categ_columns)[column_to_normalize].mean() stds_grpb = df.groupby(categ_columns)[column_to_normalize].std() if isinstance(df, dd.DataFrame): # Make sure that the values are computed, in case we're using Dask means_grpb = means.compute() stds_grpb = stds.compute() if get_stats is True: if isinstance(column_to_normalize, str): # Make sure that the feature being normalized has its name specified in the stats tmp_mean_grpb = dict() tmp_std_grpb = dict() tmp_mean_grpb[column_to_normalize] = means_grpb.to_dict() tmp_std_grpb[column_to_normalize] = stds_grpb.to_dict() # Add the current stats values to the output lists mean_list.append(tmp_mean_grpb) std_list.append(tmp_std_grpb) else: # Add the current stats values to the output lists mean_list.append(means_grpb.to_dict()) std_list.append(stds_grpb.to_dict()) # Get the categories columns as a numpy array, so as to # index the groupby-resulting dataframes of mean and standard # deviation values cat_arr = df[categ_columns].to_numpy() if isinstance(categ_columns, list) and len(categ_columns) > 1: # Convert the sets of values into tuples so as to be # properly readable as dataframe indices cat_arr = list(map(tuple, cat_arr)) # Get the mean and standard deviation values in the same # order as the original dataframe's row order means_cat = means_grpb.loc[cat_arr].to_numpy() stds_cat = stds_grpb.loc[cat_arr].to_numpy() # Normalize the right categories data[column_to_normalize] = (data[column_to_normalize] - means_cat) / stds_cat if get_stats is True: # Merge all the stats dictionaries mean_categ_dict = utils.merge_dicts(mean_list) std_categ_dict = utils.merge_dicts(std_list) # Otherwise, the tensor is normalized else: if columns_to_normalize is not False: # Dictionaries to retrieve the mean and standard deviation values column_means = dict(means) column_stds = dict(stds) # Dictionary to convert the the tensor's column indices into the dataframe's column names idx_to_name = dict(enumerate(df.columns)) # Dictionary to convert the dataframe's column names into the tensor's column indices name_to_idx = dict([(t[1], t[0]) for t in enumerate(df.columns)]) # List of indices of the tensor's columns which are needing normalization tensor_columns_to_normalize = [name_to_idx[name] for name in columns_to_normalize] # Normalize the right columns print(f'z-score normalizing columns {columns_to_normalize}...') for col in utils.iterations_loop(tensor_columns_to_normalize, see_progress=see_progress): data[:, :, col] = ((data[:, :, col] - column_means[idx_to_name[col]]) / column_stds[idx_to_name[col]]) if get_stats is False: return data elif columns_to_normalize is not False and columns_to_normalize_categ is not None: return data, means.to_dict(), stds.to_dict(), mean_categ_dict, std_categ_dict elif columns_to_normalize is not False and columns_to_normalize_categ is None: return data, means.to_dict(), stds.to_dict() elif columns_to_normalize is False and columns_to_normalize_categ is not None: return data, mean_categ_dict, std_categ_dict elif normalization_method.lower() == 'min-max': if columns_to_normalize is not False: mins = df[columns_to_normalize].min() maxs = df[columns_to_normalize].max() # Check if there are constant features const_feat = list(mins[mins == maxs].index) if len(const_feat) > 0: # Prevent constant features from being normalized [columns_to_normalize.remove(col) for col in const_feat] mins = mins.drop(const_feat) maxs = maxs.drop(const_feat) warnings.warn(f'Found columns {const_feat} to be constant throughout all the data. They should be removed as no insight will be extracted from them.') if isinstance(df, dd.DataFrame): # Make sure that the values are computed, in case we're using Dask mins = means.compute() maxs = maxs.compute() # Check if the data being normalized is directly the dataframe if data is None: if not inplace: # Make a copy of the data to avoid potentially unwanted changes to the original dataframe data = df.copy() else: # Use the original dataframe data = df if columns_to_normalize is not False: # Normalize the right columns print(f'min-max normalizing columns {columns_to_normalize}...') data[columns_to_normalize] = (data[columns_to_normalize] - mins) / (maxs - mins) if columns_to_normalize_categ is not None: if get_stats is True: min_list = [] max_list = [] # Make sure that the columns_to_normalize_categ is a list if isinstance(columns_to_normalize_categ, tuple): columns_to_normalize_categ = [columns_to_normalize_categ] if not isinstance(columns_to_normalize_categ, list): raise Exception(f'ERROR: The `columns_to_normalize_categ` argument must be specified as either a single tuple or a list of tuples. Received input with type {type(columns_to_normalize_categ)}.') print(f'min-max normalizing columns {columns_to_normalize_categ} by their associated categories...') for col_tuple in columns_to_normalize_categ: categ_columns = col_tuple[0] column_to_normalize = col_tuple[1] # Calculate the minimum and maximum values mins_grpb = df.groupby(col_tuple[0])[col_tuple[1]].min() maxs_grpb = df.groupby(col_tuple[0])[col_tuple[1]].max() if isinstance(df, dd.DataFrame): # Make sure that the values are computed, in case we're using Dask mins_grpb = mins_grpb.compute() maxs_grpb = maxs_grpb.compute() if get_stats is True: if isinstance(column_to_normalize, str): # Make sure that the feature being normalized has its name specified in the stats tmp_min_grpb = dict() tmp_max_grpb = dict() tmp_min_grpb[column_to_normalize] = mins_grpb.to_dict() tmp_max_grpb[column_to_normalize] = maxs_grpb.to_dict() # Add the current stats values to the output lists min_list.append(tmp_min_grpb) max_list.append(tmp_max_grpb) else: # Add the current stats values to the output lists min_list.append(mins_grpb.to_dict()) max_list.append(maxs_grpb.to_dict()) # Get the categories columns as a numpy array, so as to # index the groupby-resulting dataframes of minimum and # maximum values cat_arr = df[categ_columns].to_numpy() if isinstance(categ_columns, list) and len(categ_columns) > 1: # Convert the sets of values into tuples so as to be # properly readable as dataframe indices cat_arr = list(map(tuple, cat_arr)) # Get the minimum and maximum values in the same # order as the original dataframe's row order mins_cat = mins_grpb.loc[cat_arr].to_numpy() maxs_cat = maxs_grpb.loc[cat_arr].to_numpy() # Normalize the right categories data[column_to_normalize] = (data[column_to_normalize] - mins_cat) / (maxs_cat - mins_cat) if get_stats is True: # Merge all the stats dictionaries min_categ_dict = utils.merge_dicts(min_list) max_categ_dict = utils.merge_dicts(max_list) # Otherwise, the tensor is normalized else: if columns_to_normalize is not False: # Dictionaries to retrieve the min and max values column_mins = dict(mins) column_maxs = dict(maxs) # Dictionary to convert the the tensor's column indices into the dataframe's column names idx_to_name = dict(enumerate(df.columns)) # Dictionary to convert the dataframe's column names into the tensor's column indices name_to_idx = dict([(t[1], t[0]) for t in enumerate(df.columns)]) # List of indices of the tensor's columns which are needing normalization tensor_columns_to_normalize = [name_to_idx[name] for name in columns_to_normalize] # Normalize the right columns print(f'min-max normalizing columns {columns_to_normalize}...') for col in utils.iterations_loop(tensor_columns_to_normalize, see_progress=see_progress): data[:, :, col] = ((data[:, :, col] - column_mins[idx_to_name[col]]) / (column_maxs[idx_to_name[col]] - column_mins[idx_to_name[col]])) if get_stats is False: return data elif columns_to_normalize is not False and columns_to_normalize_categ is not None: return data, mins.to_dict(), maxs.to_dict(), min_categ_dict, max_categ_dict elif columns_to_normalize is not False and columns_to_normalize_categ is None: return data, mins.to_dict(), maxs.to_dict() elif columns_to_normalize is False and columns_to_normalize_categ is not None: return data, min_categ_dict, max_categ_dict else: raise ValueError(f'{normalization_method} isn\'t a valid normalization method. Available options \ are "z-score" and "min-max".')
def one_hot_encoding_dataframe(df, columns, clean_name=True, clean_missing_values=True, specific_nan_strings=[], lower_case=False, has_nan=False, join_rows=False, join_by=['patientunitstayid', 'ts'], get_new_column_names=False, search_by_dtypes=False, inplace=False)
-
Transforms specified column(s) from a dataframe into a one hot encoding representation.
Parameters
df
:pandas.DataFrame
ordask.DataFrame
- Dataframe that will be used, which contains the specified column.
columns
:list
ofstrings
- Name of the column(s) that will be conveted to one hot encoding.
clean_name
:bool
, defaultTrue
- If set to true, changes the name of the categorical values into lower case, with words separated by an underscore instead of space.
clean_missing_values
:bool
, defaultTrue
- If set to True, the algorithm will search for missing value representations and replace them with the standard, NumPy NaN value.
specific_nan_strings
:list
ofstrings
, default[]
- Parameter where the user can specify additional strings that should correspond to missing values.
lower_case
:bool
, defaultFalse
- If set to True, all strings will be converted to lower case.
has_nan
:bool
, defaultFalse
- If set to true, will first fill the missing values (NaN) with the string f'{column}_missing_value'.
join_rows
:bool
, defaultFalse
- If set to true, will group the rows created by the one hot encoding by summing the boolean values in the rows that have the same identifiers.
join_by
:string
orlist
, default['subject_id', 'ts'])
- Name of the column (or columns) which serves as a unique identifier of the dataframe's rows, which will be used in the groupby operation if the parameter join_rows is set to true. Can be a string (single column) or a list of strings (multiple columns).
get_new_column_names
:bool
, defaultFalse
- If set to True, the names of the new columns will also be outputed.
search_by_dtypes
:bool
, defaultFalse
- If set to True, the method will only look for boolean columns based on their data type. This is only reliable if all the columns' data types have been properly set.
inplace
:bool
, defaultFalse
- If set to True, the original dataframe will be used and modified directly. Otherwise, a copy will be created and returned, without changing the original dataframe.
Raises
ColumnNotFoundError
- Column name not found in the dataframe.
Returns
ohe_df
:pandas.DataFrame
ordask.DataFrame
- Returns a new dataframe with the specified column in a one hot encoding representation.
new_column_names
:list
ofstrings
- List of the new, one hot encoded columns' names.
Expand source code
def one_hot_encoding_dataframe(df, columns, clean_name=True, clean_missing_values=True, specific_nan_strings=[], lower_case=False, has_nan=False, join_rows=False, join_by=['patientunitstayid', 'ts'], get_new_column_names=False, search_by_dtypes=False, inplace=False): '''Transforms specified column(s) from a dataframe into a one hot encoding representation. Parameters ---------- df : pandas.DataFrame or dask.DataFrame Dataframe that will be used, which contains the specified column. columns : list of strings Name of the column(s) that will be conveted to one hot encoding. clean_name : bool, default True If set to true, changes the name of the categorical values into lower case, with words separated by an underscore instead of space. clean_missing_values : bool, default True If set to True, the algorithm will search for missing value representations and replace them with the standard, NumPy NaN value. specific_nan_strings : list of strings, default [] Parameter where the user can specify additional strings that should correspond to missing values. lower_case : bool, default False If set to True, all strings will be converted to lower case. has_nan : bool, default False If set to true, will first fill the missing values (NaN) with the string f'{column}_missing_value'. join_rows : bool, default False If set to true, will group the rows created by the one hot encoding by summing the boolean values in the rows that have the same identifiers. join_by : string or list, default ['subject_id', 'ts']) Name of the column (or columns) which serves as a unique identifier of the dataframe's rows, which will be used in the groupby operation if the parameter join_rows is set to true. Can be a string (single column) or a list of strings (multiple columns). get_new_column_names : bool, default False If set to True, the names of the new columns will also be outputed. search_by_dtypes : bool, default False If set to True, the method will only look for boolean columns based on their data type. This is only reliable if all the columns' data types have been properly set. inplace : bool, default False If set to True, the original dataframe will be used and modified directly. Otherwise, a copy will be created and returned, without changing the original dataframe. Raises ------ ColumnNotFoundError Column name not found in the dataframe. Returns ------- ohe_df : pandas.DataFrame or dask.DataFrame Returns a new dataframe with the specified column in a one hot encoding representation. new_column_names : list of strings List of the new, one hot encoded columns' names. ''' if not inplace: # Make a copy of the data to avoid potentially unwanted changes to the original dataframe data_df = df.copy() else: # Use the original dataframe data_df = df # Make sure that the columns is a list if isinstance(columns, str): columns = [columns] if not isinstance(columns, list): raise Exception(f'ERROR: The `columns` argument must be specified as either a single string or a list of strings. Received input with type {type(columns)}.') print('Cleaning the categorical columns...') for col in utils.iterations_loop(columns): # Check if the column exists if col not in data_df.columns: raise Exception('ERROR: Column name not found in the dataframe.') if clean_name is True: # Clean the column's string values to have the same, standard format data_df = clean_categories_naming(data_df, col, clean_missing_values, specific_nan_strings, lower_case) if has_nan is True: # Fill NaN with "missing_value" name data_df[col] = data_df[col].fillna(value='missing_value') # Cast the variable into the built in pandas Categorical data type if isinstance(data_df, pd.DataFrame): data_df[col] = pd.Categorical(data_df[col]) if isinstance(data_df, dd.DataFrame): data_df = data_df.categorize(columns) if get_new_column_names is True: # Find the previously existing column names old_column_names = data_df.columns print('Getting dummies...') # Apply the one hot encoding to the specified columns if isinstance(data_df, dd.DataFrame): ohe_df = dd.get_dummies(data_df, columns=columns) else: ohe_df = pd.get_dummies(data_df, columns=columns) if join_rows is True: # Columns which are one hot encoded ohe_columns = search_explore.list_boolean_columns(ohe_df, search_by_dtypes=search_by_dtypes) # Group the rows that have the same identifiers ohe_df = ohe_df.groupby(join_by).sum(min_count=1).reset_index() # Clip the one hot encoded columns to a maximum value of 1 # (there might be duplicates which cause values bigger than 1) ohe_df.loc[:, ohe_columns] = ohe_df[ohe_columns].clip(upper=1) print('Done!') if get_new_column_names is True: # Find the new column names and output them new_column_names = list(set(ohe_df.columns) - set(old_column_names)) new_column_names.sort() return ohe_df, new_column_names else: return ohe_df
def remove_cols_with_many_nans(df, nan_percent_thrsh=40, inplace=False)
-
Remove columns that have too many NaN's (missing values).
Parameters
df
:pandas.DataFrame
ordask.DataFrame
- Dataframe that will be processed, to remove columns with high percentages of missing values.
nan_percent_thrsh
:int
orfloat
, default40
- Threshold value above which it's considered a column with too many missing values. Measured in percentage of missing values, in 100% format.
inplace
:bool
, defaultFalse
- If set to True, the original dataframe will be used and modified directly. Otherwise, a copy will be created and returned, without changing the original dataframe.
Returns
df
:pandas.DataFrame
ordask.DataFrame
- Corrected dataframe, with columns removed that had too many missing values.
Expand source code
def remove_cols_with_many_nans(df, nan_percent_thrsh=40, inplace=False): '''Remove columns that have too many NaN's (missing values). Parameters ---------- df : pandas.DataFrame or dask.DataFrame Dataframe that will be processed, to remove columns with high percentages of missing values. nan_percent_thrsh : int or float, default 40 Threshold value above which it's considered a column with too many missing values. Measured in percentage of missing values, in 100% format. inplace : bool, default False If set to True, the original dataframe will be used and modified directly. Otherwise, a copy will be created and returned, without changing the original dataframe. Returns ------- df : pandas.DataFrame or dask.DataFrame Corrected dataframe, with columns removed that had too many missing values. ''' if not inplace: # Make a copy of the data to avoid potentially unwanted changes to the original dataframe data_df = df.copy() else: # Use the original dataframe data_df = df # Find each column's missing values percentage nan_percent_df = search_explore.dataframe_missing_values(data_df) # Remove columns that exceed the missing values percentage threshold many_nans_cols = list(nan_percent_df[nan_percent_df.percent_missing > nan_percent_thrsh].column_name) data_df = data_df.drop(many_nans_cols, axis = 1) return data_df
def remove_rows_unmatched_key(df, key, columns)
-
Remove rows corresponding to the keys that weren't in the dataframe merged at the right.
Parameters
df
:pandas.DataFrame
ordask.DataFrame
- Dataframe resulting from a asof merge which will be searched for missing values.
key
:string
- Name of the column which was used as the "by" key in the asof merge. Typically represents a temporal feature from a time series, such as days or timestamps.
columns
:list
ofstrings
- Name of the column(s), originating from the dataframe which was merged at the right, which should not have any missing values. If it has, it means that the corresponding key wasn't present in the original dataframe. Even if there's just one column to analyze, it should be received in list format.
Returns
df
:pandas.DataFrame
ordask.DataFrame
- Returns the input dataframe but without the rows which didn't have any values in the right dataframe's features.
Expand source code
def remove_rows_unmatched_key(df, key, columns): '''Remove rows corresponding to the keys that weren't in the dataframe merged at the right. Parameters ---------- df : pandas.DataFrame or dask.DataFrame Dataframe resulting from a asof merge which will be searched for missing values. key : string Name of the column which was used as the "by" key in the asof merge. Typically represents a temporal feature from a time series, such as days or timestamps. columns : list of strings Name of the column(s), originating from the dataframe which was merged at the right, which should not have any missing values. If it has, it means that the corresponding key wasn't present in the original dataframe. Even if there's just one column to analyze, it should be received in list format. Returns ------- df : pandas.DataFrame or dask.DataFrame Returns the input dataframe but without the rows which didn't have any values in the right dataframe's features. ''' for k in utils.iterations_loop(df[key].unique()): # Variable that counts the number of columns which don't have any value # (i.e. all rows are missing values) for a given identifier 'k' num_empty_columns = 0 for col in columns: if df[df[key] == k][col].isnull().sum() == len(df[df[key] == k]): # Found one more column which is full of missing values for identifier 'k' num_empty_columns += 1 if num_empty_columns == len(columns): # Eliminate all rows corresponding to the analysed key if all the columns # are empty for the identifier 'k' df = df[~(df[key] == k)] return df
def rename_index(df, name)
-
Renames the dataframe's index to a desired name. Specially important for dask dataframes, as they don't support any elegant, one-line method for this.
Parameters
df
:pandas.DataFrame
ordask.DataFrame
- Dataframe whose index column will be renamed.
name
:string
- The new name for the index column.
Returns
df
:dask.DataFrame
- Dataframe with a renamed index column.
Expand source code
def rename_index(df, name): '''Renames the dataframe's index to a desired name. Specially important for dask dataframes, as they don't support any elegant, one-line method for this. Parameters ---------- df : pandas.DataFrame or dask.DataFrame Dataframe whose index column will be renamed. name : string The new name for the index column. Returns ------- df : dask.DataFrame Dataframe with a renamed index column. ''' if isinstance(df, dd.DataFrame): feat_names = set(df.columns) df = df.reset_index() orig_idx_name = set(df.columns) - feat_names orig_idx_name = orig_idx_name.pop() df = df.rename(columns={orig_idx_name: name}) df = df.set_index(name) elif isinstance(df, pd.DataFrame): df.index.names = [name] else: raise Exception(f'ERROR: Input "df" should either be a pandas dataframe or a dask dataframe, not type {type(df)}.') return df
def save_chunked_data(df, file_name, n_chunks=None, batch_size=1, id_column=None, data_path='', format='feather')
-
Save a dataframe in chunks, i.e. in separate files, so as to prevent memory issues and other problems when loading it back again.
Parameters
df
:pandas.DataFrame
ordask.DataFrame
- Dataframe which will be saved in chunks.
file_name
:str
- Name to be given to the file.
n_chunks
:int
, defaultNone
- Number of chunks, i.e. number of files, on which to split and save the dataframe.
batch_size
:int
, default1
- Defines the batch size, i.e. the number of samples used in each training iteration to update the model's weights.
id_column
:string
, defaultNone
- Name of the column which corresponds to the sequence or subject identifier
in the dataframe. If specified, the data will be saved in files
containing a
batch_size
number of unique IDs. This is useful if we're working with large datasets, which therefore need to be loaded file by file, lazily, in each training or inference batch. data_path
:str
, default''
- Directory path where the file will be stored.
format
:str
, default'feather'
- Data format used to saved the dataframe. Currently available options are 'feather'.
Expand source code
def save_chunked_data(df, file_name, n_chunks=None, batch_size=1, id_column=None, data_path='', format='feather'): '''Save a dataframe in chunks, i.e. in separate files, so as to prevent memory issues and other problems when loading it back again. Parameters ---------- df : pandas.DataFrame or dask.DataFrame Dataframe which will be saved in chunks. file_name : str Name to be given to the file. n_chunks : int, default None Number of chunks, i.e. number of files, on which to split and save the dataframe. batch_size : int, default 1 Defines the batch size, i.e. the number of samples used in each training iteration to update the model's weights. id_column : string, default None Name of the column which corresponds to the sequence or subject identifier in the dataframe. If specified, the data will be saved in files containing a `batch_size` number of unique IDs. This is useful if we're working with large datasets, which therefore need to be loaded file by file, lazily, in each training or inference batch. data_path : str, default '' Directory path where the file will be stored. format : str, default 'feather' Data format used to saved the dataframe. Currently available options are 'feather'. ''' n_rows = len(df) format = str(format).lower() if format == 'feather': file_ext = '.ftr' else: raise Exception(f'ERROR: Invalid data format "{format}". Please choose one of the currently supported formats "feather".') if n_chunks is not None: # Total number of rows per file chunk_size = int(n_rows / n_chunks) for i in du.utils.iterations_loop(range(n_chunks)): # Get a chunk of the dataframe if i < n_chunks-1: df_i = df.iloc[i*chunk_size:(i+1)*chunk_size] else: df_i = df.iloc[i*chunk_size:] # Reset the index, so as to make it feather compatible df_i.reset_index(drop=True, inplace=True) # Save the current dataframe df_i.to_feather(f'{data_path}{file_name}_{i}{file_ext}') # Remove the already saved dataframe from memory del df_i elif batch_size is not None and id_column is not None: # List of unique sequence identifiers ids = list(df[id_column].unique()) # Number of unique IDs n_ids = len(ids) # Total number of files to be saved n_chunks = max(1, math.ceil(n_ids / batch_size)) for i in du.utils.iterations_loop(range(n_chunks)): # Set the current batch's list of IDs if i < n_chunks-1: ids_i = ids[i*batch_size:(i+1)*batch_size] else: ids_i = ids[i*batch_size:] # Get a chunk of the dataframe df_i = df[df[id_column].isin(ids_i)] # Reset the index, so as to make it feather compatible df_i.reset_index(drop=True, inplace=True) # Save the current dataframe df_i.to_feather(f'{data_path}{file_name}_{i}{file_ext}') # Remove the already saved dataframe from memory del df_i else: raise Exception(f'ERROR: Invalid set of input parameters. The user must either specify a number of chunks (`n_chunks`) to save the data or a batch size (`batch_size`) and an ID column (`id_column`) on which to fetch sequences.')
def set_dosage_and_units(df, orig_column='dosage', new_column_names=['drug_dosage', 'drug_unit'])
-
Separate medication dosage string column into numeric dosage and units features.
Parameters
df
:pandas.DataFrame
ordask.DataFrame
- Dataframe containing the medication dosage information.
orig_column
:string
, default'dosage'
- Name of the original column, which will be split in two.
Returns
df
:pandas.DataFrame
ordask.DataFrame
- Dataframe after adding the numeric dosage and units columns.
Expand source code
def set_dosage_and_units(df, orig_column='dosage', new_column_names=['drug_dosage', 'drug_unit']): '''Separate medication dosage string column into numeric dosage and units features. Parameters ---------- df : pandas.DataFrame or dask.DataFrame Dataframe containing the medication dosage information. orig_column : string, default 'dosage' Name of the original column, which will be split in two. Returns ------- df : pandas.DataFrame or dask.DataFrame Dataframe after adding the numeric dosage and units columns. ''' # Separate the dosage and unit data dosage_unit_data = df[orig_column].apply(__sep_dosage_units) # Make sure that the new columns are created for col in new_column_names: df[col] = np.nan # Add the new dosage and units columns df[new_column_names] = pd.DataFrame(dosage_unit_data.to_numpy().tolist(), index=dosage_unit_data.index) return df
def signal_idx_derivative(s, time_scale='seconds', periods=1)
-
Creates a series that contains the signal's index derivative, with the same divisions (if needed) as the original data and on the desired time scale.
Parameters
s
:pandas.Series
ordask.Series
- Series which will be analyzed for outlier detection.
time_scale
:bool
, default'seconds'
- How to calculate derivatives, either with respect to the index values, on the time scale of 'seconds', 'minutes', 'hours', 'days', 'months' or 'years', or just sequentially, just getting the difference between consecutive values, 'False'. Only used if parameter 'signal' isn't set to 'value'.
periods
:int
, default1
- Defines the steps to take when calculating the derivative. When set to 1, it performs a normal backwards derivative. When set to 1, it performs a normal forwards derivative.
Returns
s_idx
:pandas.Series
ordask.Series
- Index derivative signal, on the desired time scale.
Expand source code
def signal_idx_derivative(s, time_scale='seconds', periods=1): '''Creates a series that contains the signal's index derivative, with the same divisions (if needed) as the original data and on the desired time scale. Parameters ---------- s : pandas.Series or dask.Series Series which will be analyzed for outlier detection. time_scale : bool, default 'seconds' How to calculate derivatives, either with respect to the index values, on the time scale of 'seconds', 'minutes', 'hours', 'days', 'months' or 'years', or just sequentially, just getting the difference between consecutive values, 'False'. Only used if parameter 'signal' isn't set to 'value'. periods : int, default 1 Defines the steps to take when calculating the derivative. When set to 1, it performs a normal backwards derivative. When set to 1, it performs a normal forwards derivative. Returns ------- s_idx : pandas.Series or dask.Series Index derivative signal, on the desired time scale. ''' # Calculate the signal index's derivative s_idx = s.index.to_series().diff() if isinstance(s_idx, dd.DataFrame): # Make the new derivative have the same divisions as the original signal s_idx = (s_idx.to_frame().rename(columns={s.index.name:'tmp_val'}) .reset_index() .set_index(s.index.name, sorted=True, divisions=s.divisions) .tmp_val) # Convert derivative to the desired time scale if time_scale == 'seconds': s_idx = s_idx.dt.seconds elif time_scale == 'minutes': s_idx = s_idx.dt.seconds / 60 elif time_scale == 'hours': s_idx = s_idx.dt.seconds / 3600 elif time_scale == 'days': s_idx = s_idx.dt.seconds / 86400 elif time_scale == 'months': s_idx = s_idx.dt.seconds / 2592000 return s_idx
def slopes_outlier_detect(s, max_thrs=4, bidir_sens=0.5, threshold_type='std', time_scale='seconds', only_bir=False)
-
Detects outliers based on large variations on the signal's derivatives, either in one direction or on both at the same time.
Parameters
s
:pandas.Series
ordask.Series
- Series which will be analyzed for outlier detection.
max_thrs
:int
orfloat
- Maximum threshold, i.e. no point can have a magnitude derivative value deviate more than this threshold, in the signal that we're analyzing.
bidir_sens
:float
, default0.5
- Dictates how much more sensitive the algorithm is when a deviation (i.e.
large variation) is found on both sides of the data point / both
directions of the derivative. In other words, it's a factor that will be
multiplied by the usual one-directional threshold (
max_thrs
), from which the resulting value will be used as the bidirectional threshold. threshold_type
:string
, default'std'
- Determines if we're using threshold values with respect to the original scale of derivative values, 'absolute', relative to the derivative's mean, 'mean' or 'average', to the median, 'median' or to the standard deviation, 'std'. As such, the possible settings are ['absolute', 'mean', 'average', 'median', 'std'].
time_scale
:string
orbool
, default'seconds'
- How to calculate derivatives, either with respect to the index values, on the time scale of 'seconds', 'minutes', 'hours', 'days', 'months' or 'years', or just sequentially, just getting the difference between consecutive values, 'False'. Only used if parameter 'signal' isn't set to 'value'.
only_bir
:bool
, defaultFalse
- If set to True, the algorithm will only check for data points that have large derivatives on both directions.
Returns
outlier_s
:pandas.Series
ordask.Series
- Boolean series indicating where the detected outliers are.
Expand source code
def slopes_outlier_detect(s, max_thrs=4, bidir_sens=0.5, threshold_type='std', time_scale='seconds', only_bir=False): '''Detects outliers based on large variations on the signal's derivatives, either in one direction or on both at the same time. Parameters ---------- s : pandas.Series or dask.Series Series which will be analyzed for outlier detection. max_thrs : int or float Maximum threshold, i.e. no point can have a magnitude derivative value deviate more than this threshold, in the signal that we're analyzing. bidir_sens : float, default 0.5 Dictates how much more sensitive the algorithm is when a deviation (i.e. large variation) is found on both sides of the data point / both directions of the derivative. In other words, it's a factor that will be multiplied by the usual one-directional threshold (`max_thrs`), from which the resulting value will be used as the bidirectional threshold. threshold_type : string, default 'std' Determines if we're using threshold values with respect to the original scale of derivative values, 'absolute', relative to the derivative's mean, 'mean' or 'average', to the median, 'median' or to the standard deviation, 'std'. As such, the possible settings are ['absolute', 'mean', 'average', 'median', 'std']. time_scale : string or bool, default 'seconds' How to calculate derivatives, either with respect to the index values, on the time scale of 'seconds', 'minutes', 'hours', 'days', 'months' or 'years', or just sequentially, just getting the difference between consecutive values, 'False'. Only used if parameter 'signal' isn't set to 'value'. only_bir : bool, default False If set to True, the algorithm will only check for data points that have large derivatives on both directions. Returns ------- outlier_s : pandas.Series or dask.Series Boolean series indicating where the detected outliers are. ''' # Calculate the difference between consecutive values bckwrds_deriv = s.diff() frwrds_deriv = s.diff(-1) if time_scale is not None: # Derivate by the index values bckwrds_deriv = bckwrds_deriv / signal_idx_derivative(bckwrds_deriv, time_scale, periods=1) frwrds_deriv = frwrds_deriv / signal_idx_derivative(frwrds_deriv, time_scale, periods=-1) if threshold_type.lower() == 'absolute': bckwrds_deriv = bckwrds_deriv frwrds_deriv = frwrds_deriv elif threshold_type.lower() == 'mean' or threshold_type.lower() == 'average': bckwrds_deriv_mean = bckwrds_deriv.mean() frwrds_deriv_mean = frwrds_deriv.mean() if isinstance(bckwrds_deriv, dd.DataFrame): # Make sure that the value is computed, in case we're using Dask bckwrds_deriv_mean = bckwrds_deriv_mean.compute() frwrds_deriv_mean = frwrds_deriv_mean.compute() # Normalize by the average value bckwrds_deriv = bckwrds_deriv / bckwrds_deriv_mean frwrds_deriv = frwrds_deriv / frwrds_deriv_mean elif threshold_type.lower() == 'median': bckwrds_deriv_median = bckwrds_deriv.median() frwrds_deriv_median = frwrds_deriv.median() if isinstance(bckwrds_deriv, dd.DataFrame): # Make sure that the value is computed, in case we're using Dask bckwrds_deriv_median = bckwrds_deriv_median.compute() frwrds_deriv_median = frwrds_deriv_median.compute() # Normalize by the median value bckwrds_deriv = bckwrds_deriv / bckwrds_deriv_median frwrds_deriv = frwrds_deriv / frwrds_deriv_median elif threshold_type.lower() == 'std': bckwrds_deriv_mean = bckwrds_deriv.mean() frwrds_deriv_mean = frwrds_deriv.mean() bckwrds_deriv_std = bckwrds_deriv.std() frwrds_deriv_std = frwrds_deriv.std() if isinstance(bckwrds_deriv, dd.DataFrame): # Make sure that the values are computed, in case we're using Dask bckwrds_deriv_mean = bckwrds_deriv_mean.compute() frwrds_deriv_mean = frwrds_deriv_mean.compute() bckwrds_deriv_std = bckwrds_deriv_std.compute() frwrds_deriv_std = frwrds_deriv_std.compute() # Normalize by the average and standard deviation values bckwrds_deriv = (bckwrds_deriv - bckwrds_deriv_mean) / bckwrds_deriv_std frwrds_deriv = (frwrds_deriv - frwrds_deriv_mean) / frwrds_deriv_std else: raise Exception('ERROR: Invalid value type. It must be "absolute", "mean", "average", "median" or "std", not {threshold_type}.') # Bidirectional threshold, to be used when observing both directions of the derivative bidir_max = bidir_sens * max_thrs if only_bir is True: # Search for outliers on both derivatives at the same time, always on their respective magnitudes outlier_s = (bckwrds_deriv.abs() > bidir_max) & (frwrds_deriv.abs() > bidir_max) else: # Search for outliers on each individual derivative, followed by both at the same time with a lower threshold, always on their respective magnitudes outlier_s = ((bckwrds_deriv.abs() > max_thrs) | (frwrds_deriv.abs() > max_thrs) | ((bckwrds_deriv.abs() > bidir_max) & (frwrds_deriv.abs() > bidir_max))) return outlier_s
def standardize_missing_values(x, specific_nan_strings=[])
-
Apply function to be used in replacing missing value representations with the standard NumPy NaN value.
Parameters
x
:str, int
orfloat
- Value to be analyzed and replaced with NaN, if it has a missing value representation.
specific_nan_strings
:list
ofstrings
, default[]
- Parameter where the user can specify additional strings that should correspond to missing values.
Returns
x
:str, int
orfloat
- Corrected value, with standardized missing value representation.
Expand source code
def standardize_missing_values(x, specific_nan_strings=[]): '''Apply function to be used in replacing missing value representations with the standard NumPy NaN value. Parameters ---------- x : str, int or float Value to be analyzed and replaced with NaN, if it has a missing value representation. specific_nan_strings : list of strings, default [] Parameter where the user can specify additional strings that should correspond to missing values. Returns ------- x : str, int or float Corrected value, with standardized missing value representation. ''' if isinstance(x, str): if utils.is_string_nan(x, specific_nan_strings): return np.nan else: return x else: return x
def standardize_missing_values_df(df, see_progress=True, specific_nan_strings=[])
-
Replace all elements in a dataframe that have a missing value representation with the standard NumPy NaN value.
Parameters
df
:pandas.DataFrame
ordask.DataFrame
- Dataframe to be analyzed and have its content replaced with NaN, wherever a missing value representation is found.
see_progress
:bool
, defaultTrue
- If set to True, a progress bar will show up indicating the execution of the normalization calculations.
specific_nan_strings
:list
ofstrings
, default[]
- Parameter where the user can specify additional strings that should correspond to missing values.
Returns
df
:pandas.DataFrame
ordask.DataFrame
- Corrected dataframe, with standardized missing value representation.
Expand source code
def standardize_missing_values_df(df, see_progress=True, specific_nan_strings=[]): '''Replace all elements in a dataframe that have a missing value representation with the standard NumPy NaN value. Parameters ---------- df : pandas.DataFrame or dask.DataFrame Dataframe to be analyzed and have its content replaced with NaN, wherever a missing value representation is found. see_progress : bool, default True If set to True, a progress bar will show up indicating the execution of the normalization calculations. specific_nan_strings : list of strings, default [] Parameter where the user can specify additional strings that should correspond to missing values. Returns ------- df : pandas.DataFrame or dask.DataFrame Corrected dataframe, with standardized missing value representation. ''' for feature in utils.iterations_loop(df.columns, see_progress=see_progress): if isinstance(df, dd.DataFrame): df[feature] = df[feature].apply(lambda x: standardize_missing_values(x, specific_nan_strings), meta=df[feature]._meta.dtypes) elif isinstance(df, pd.DataFrame): df[feature] = df[feature].apply(lambda x: standardize_missing_values(x, specific_nan_strings)) else: raise Exception(f'ERROR: Input "df" should either be a pandas dataframe or a dask dataframe, not type {type(df)}.') return df
def threshold_outlier_detect(s, max_thrs=None, min_thrs=None, threshold_type='absolute', signal_type='value', time_scale='seconds', derivate_direction='backwards')
-
Detects outliers based on predetermined thresholds.
Parameters
s
:pandas.Series
ordask.Series
- Series which will be analyzed for outlier detection.
max_thrs
:int
orfloat
, defaultNone
- Maximum threshold, i.e. no normal value can be larger than this threshold, in the signal (or its n-order derivative) that we're analyzing.
min_thrs
:int
orfloat
, defaultNone
- Minimum threshold, i.e. no normal value can be smaller than this threshold, in the signal (or its n-order derivative) that we're analyzing.
threshold_type
:string
, default'absolute'
- Determines if we're using threshold values with respect to the original scale of values, 'absolute', relative to the signal's mean, 'mean' or 'average', to the median, 'median' or to the standard deviation, 'std'. As such, the possible settings are ['absolute', 'mean', 'average', 'median', 'std'].
signal_type
:string
, default'value'
- Sets if we're analyzing the original signal value, 'value', its first derivative, 'derivative' or 'speed', or its second derivative, 'second derivative' or 'acceleration'. As such, the possible settings are ['value', 'derivative', 'speed', 'second derivative', 'acceleration'].
time_scale
:string
orbool
, default'seconds'
- How to calculate derivatives, either with respect to the index values, on the time scale of 'seconds', 'minutes', 'hours', 'days', 'months' or 'years', or just sequentially, just getting the difference between consecutive values, 'False'. Only used if parameter 'signal' isn't set to 'value'.
derivate_direction
:string
, default'backwards'
- The direction in which we calculate the derivative, either comparing to previous values, 'backwards', or to the next values, 'forwards'. As such, the possible settings are ['backwards', 'forwards']. Only used if parameter 'signal' isn't set to 'value'.
Returns
outlier_s
:pandas.Series
ordask.Series
- Boolean series indicating where the detected outliers are.
Expand source code
def threshold_outlier_detect(s, max_thrs=None, min_thrs=None, threshold_type='absolute', signal_type='value', time_scale='seconds', derivate_direction='backwards'): '''Detects outliers based on predetermined thresholds. Parameters ---------- s : pandas.Series or dask.Series Series which will be analyzed for outlier detection. max_thrs : int or float, default None Maximum threshold, i.e. no normal value can be larger than this threshold, in the signal (or its n-order derivative) that we're analyzing. min_thrs : int or float, default None Minimum threshold, i.e. no normal value can be smaller than this threshold, in the signal (or its n-order derivative) that we're analyzing. threshold_type : string, default 'absolute' Determines if we're using threshold values with respect to the original scale of values, 'absolute', relative to the signal's mean, 'mean' or 'average', to the median, 'median' or to the standard deviation, 'std'. As such, the possible settings are ['absolute', 'mean', 'average', 'median', 'std']. signal_type : string, default 'value' Sets if we're analyzing the original signal value, 'value', its first derivative, 'derivative' or 'speed', or its second derivative, 'second derivative' or 'acceleration'. As such, the possible settings are ['value', 'derivative', 'speed', 'second derivative', 'acceleration']. time_scale : string or bool, default 'seconds' How to calculate derivatives, either with respect to the index values, on the time scale of 'seconds', 'minutes', 'hours', 'days', 'months' or 'years', or just sequentially, just getting the difference between consecutive values, 'False'. Only used if parameter 'signal' isn't set to 'value'. derivate_direction : string, default 'backwards' The direction in which we calculate the derivative, either comparing to previous values, 'backwards', or to the next values, 'forwards'. As such, the possible settings are ['backwards', 'forwards']. Only used if parameter 'signal' isn't set to 'value'. Returns ------- outlier_s : pandas.Series or dask.Series Boolean series indicating where the detected outliers are. ''' if signal_type.lower() == 'value': signal = s elif signal_type.lower() == 'derivative' or signal_type.lower() == 'speed': if derivate_direction.lower() == 'backwards': periods = 1 elif derivate_direction.lower() == 'forwards': periods = -1 else: raise Exception(f'ERROR: Invalid derivative direction. It must either be "backwards" or "forwards", not {derivate_direction}.') # Calculate the difference between consecutive values signal = s.diff(periods) if time_scale is not None: # Derivate by the index values signal = signal / signal_idx_derivative(signal, time_scale, periods) elif (signal_type.lower() == 'second derivative' or signal_type.lower() == 'acceleration'): if derivate_direction.lower() == 'backwards': periods = 1 elif derivate_direction.lower() == 'forwards': periods = -1 else: raise Exception(f'ERROR: Invalid derivative direction. It must either be "backwards" or "forwards", not {derivate_direction}.') # Calculate the difference between consecutive values signal = s.diff(periods).diff(periods) if time_scale is not None: # Derivate by the index values signal = signal / signal_idx_derivative(signal, time_scale, periods) else: raise Exception('ERROR: Invalid signal type. It must be "value", "derivative", "speed", "second derivative" or "acceleration", not {signal}.') if threshold_type.lower() == 'absolute': signal = signal elif threshold_type.lower() == 'mean' or threshold_type.lower() == 'average': signal_mean = signal.mean() if isinstance(signal, dd.DataFrame): # Make sure that the value is computed, in case we're using Dask signal_mean = signal_mean.compute() # Normalize by the average value signal = signal / signal_mean elif threshold_type.lower() == 'median': if isinstance(signal, dd.DataFrame): # Make sure that the value is computed, in case we're using Dask signal_median = signal.compute().median() else: signal_median = signal.median() # Normalize by the median value signal = signal / signal_median elif threshold_type.lower() == 'std': signal_mean = signal.mean() signal_std = signal.std() if isinstance(signal, dd.DataFrame): # Make sure that the values are computed, in case we're using Dask signal_mean = signal_mean.compute() signal_std = signal_std.compute() # Normalize by the average and standard deviation values signal = (signal - signal_mean) / signal_std else: raise Exception(f'ERROR: Invalid value type. It must be "absolute", "mean", "average", "median" or "std", not {threshold_type}.') # Search for outliers based on the given thresholds if max_thrs is not None and min_thrs is not None: outlier_s = (signal > max_thrs) | (signal < min_thrs) elif max_thrs is not None: outlier_s = signal > max_thrs elif min_thrs is not None: outlier_s = signal < min_thrs else: raise Exception('ERROR: At least a maximum or a minimum threshold must be set. Otherwise, no outlier will ever be detected.') return outlier_s
def transpose_dataframe(df, column_to_transpose=None, inplace=False)
-
Transpose a dataframe, either by its original index or through a specific column, which will be converted to the new column names (i.e. the header).
Parameters
data
:pandas.DataFrame
ordask.DataFrame
- Dataframe that will be transposed.
column_to_transpose
:string
, defaultNone
- If specified, the given column will be used as the new column names, with its unique values forming the new dataframe's header. Otherwise, the dataframe will be transposed on its original index.
inplace
:bool
, defaultFalse
- If set to True, the original tensor or dataframe will be used and modified directly. Otherwise, a copy will be created and returned, without changing the original tensor or dataframe.
Returns
data
:pandas.DataFrame
ordask.DataFrame
- Transposed dataframe.
Expand source code
def transpose_dataframe(df, column_to_transpose=None, inplace=False): '''Transpose a dataframe, either by its original index or through a specific column, which will be converted to the new column names (i.e. the header). Parameters ---------- data : pandas.DataFrame or dask.DataFrame Dataframe that will be transposed. column_to_transpose : string, default None If specified, the given column will be used as the new column names, with its unique values forming the new dataframe's header. Otherwise, the dataframe will be transposed on its original index. inplace : bool, default False If set to True, the original tensor or dataframe will be used and modified directly. Otherwise, a copy will be created and returned, without changing the original tensor or dataframe. Returns ------- data : pandas.DataFrame or dask.DataFrame Transposed dataframe. ''' if not inplace: # Make a copy of the data to avoid potentially unwanted changes to the original dataframe data_df = df.copy() else: # Use the original dataframe data_df = df if column_to_transpose is not None: # Set as index the column that has the desired column names as values data_df = data_df.set_index(column_to_transpose) if isinstance(data_df, pd.DataFrame): data_df = data_df.transpose() elif isinstance(data_df, dd.DataFrame): data_df = (dd.from_pandas(data_df.compute().transpose(), npartitions=data_df.npartitions)) else: raise Exception(f'ERROR: The input data must either be a Pandas dataframe or a Dask dataframe, not {type(df)}.') return data_df