Source code for isaricanalytics.analytics

from __future__ import annotations

__all__ = [
    "convert_categorical_to_onehot",
    "convert_onehot_to_categorical",
    "create_grouped_results",
    "descriptive_comparison_table",
    "descriptive_table",
    "execute_cox_model",
    "execute_glm_regression",
    "execute_glmm_regression",
    "execute_kaplan_meier",
    "extend_dictionary",
    "format_descriptive_table_variables",
    "format_pvalue",
    "format_variables",
    "from_timeA_to_timeB",
    "get_chi2_pvalue",
    "get_counts",
    "get_descriptive_data",
    "get_fisher_exact_pvalue",
    "get_mean_and_stdev",
    "get_median_interquartile_range",
    "get_modelling_data",
    "get_n_percent_value",
    "get_parameter_ranking",
    "get_proportions",
    "get_pyramid_data",
    "get_upset_counts_intersections",
    "get_variables_by_section_and_type",
    "impute_miss_val",
    "lasso_var_sel_binary",
    "mannwhitneyu",
    "mean_std_str",
    "median_iqr_str",
    "n_percent_str",
    "norm",
    "regression_summary_table",
    "remove_single_binary_outcome_predictors",
    "rmv_high_corr",
    "rmv_low_var",
    "trim_field_label",
    "variance_influence_factor_backwards_elimination",
]


# -- IMPORTS --

# -- Standard libraries --
import typing
import warnings

# -- 3rd party libraries --
import numpy as np
import pandas
import statsmodels.api as sm
import statsmodels.formula.api as smf
from lifelines import CoxPHFitter, KaplanMeierFitter
from lifelines.statistics import logrank_test, multivariate_logrank_test
from scipy.stats import chi2_contingency, fisher_exact, mannwhitneyu, norm
from sklearn.linear_model import LogisticRegressionCV
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
from sklearn.model_selection import StratifiedKFold
from sklearn.preprocessing import LabelEncoder, StandardScaler
from statsmodels.genmod.bayes_mixed_glm import BinomialBayesMixedGLM
from statsmodels.stats.outliers_influence import variance_inflation_factor

# -- Internal libraries --


pd = pandas  # An alias to allow Pandas code refs to work independently
# of Pandas Intersphinx refs in type hinting and docstrings

############################################
############################################
# General preprocessing
############################################
############################################


[docs] def extend_dictionary( dictionary: pandas.DataFrame, new_variable_dict: dict[str, typing.Any], data: pandas.DataFrame, sep: str = "___", ) -> pandas.DataFrame: """:py:class:`pandas.DataFrame` : Returns the VERTEX dictionary with new custom variables added. Parameters ---------- dictionary : pandas.DataFrame VERTEX dictionary containing columns ``"field_name"``, ``"form_name"``, ``"field_type"``, ``"field_label"``, ``"parent"``, ``"branching_logic"``. new_variable_dict : dict A dict with the same keys as the dictionary columns, the values for each item can be a string or a list. data : pandas.DataFrame Pandas dataframe containing the data for the project. The columns of this dataframe must include the variables in ``new_variable_dict["field_type"]``. sep : str Separator for creating new one-hot-encoded variable names. Returns ------- pandas.DataFrame VERTEX dictionary containing the original variables, plus the new variables and any one-hot-encoded variables derived from this. """ # noqa : E501 # Convert dict values to list if all are strings (otherwise a pandas error) if all(isinstance(v, str) for v in new_variable_dict.values()): new_variable_dict = {k: [v] for k, v in new_variable_dict.items()} new_dictionary = pd.DataFrame.from_dict(new_variable_dict) new_dictionary["index"] = np.nan dictionary = dictionary.reset_index(drop=False) for ind in new_dictionary.index: parent = new_dictionary.loc[ind, "parent"] if parent not in new_dictionary["field_name"].values: if parent not in dictionary["field_name"].values: new_ind = dictionary["index"].max() + 0.1 else: new_ind = dictionary.loc[ (dictionary["field_name"] == parent), "index" ].max() parent = dictionary.loc[new_ind, "field_name"] while parent in dictionary["parent"].values: new_ind = dictionary.loc[ (dictionary["parent"] == parent), "index" ].max() parent = dictionary.loc[new_ind, "field_name"] new_ind = new_ind + 0.1 else: new_ind = ( new_dictionary.loc[ (new_dictionary["field_name"] == parent), "index" ].max() + 0.1 ) # new_ind = new_ind + 0.1 new_dictionary.loc[ind, "index"] = new_ind categorical_ind = new_dictionary["field_type"].isin(["categorical"]) new_dictionary_list = [new_dictionary] ind_list = [ ind for ind in categorical_ind.loc[categorical_ind].index if new_dictionary.loc[ind, "field_name"] in data.columns ] for ind in ind_list: variable = new_dictionary.loc[ind, "field_name"] options = data[variable].drop_duplicates().sort_values().dropna() options = [y for y in options if y not in (True, False)] options = [ y for y in options if (variable + sep + str(y)) not in new_dictionary["field_name"].values ] add_options = pd.DataFrame( columns=dictionary.columns, index=range(len(options)) ) add_options["field_name"] = [variable + sep + str(y) for y in options] add_options["form_name"] = new_dictionary.loc[ind, "form_name"] add_options["branching_logic"] = new_dictionary.loc[ind, "branching_logic"] add_options["field_type"] = "binary" add_options["field_label"] = options add_options["parent"] = variable add_options["index"] = np.linspace( new_dictionary.loc[ind, "index"] + 0.2, np.ceil(new_dictionary.loc[ind, "index"]) - 0.1, len(options), ) new_dictionary_list += [add_options] dictionary = pd.concat([dictionary] + new_dictionary_list, axis=0) dictionary = dictionary.sort_values(by="index").drop(columns="index") dictionary = dictionary.reset_index(drop=True) return dictionary
[docs] def get_variables_by_section_and_type( data: pandas.DataFrame, dictionary: pandas.DataFrame, required_variables: typing.Iterable[str] | None = None, include_sections: typing.Iterable[str] = ["demog"], include_types: typing.Iterable[str] = ["binary", "categorical", "numeric"], exclude_suffixes: typing.Iterable[str] = [ "_units", "addi", "otherl2", "item", "_oth", "_unlisted", "otherl3", ], include_subjid: bool = False, ) -> list[str]: """:py:class:`list` : Returns a list of all variables in the dataframe from specified sections and types, plus any required variables. Parameters ---------- data : pandas.DataFrame Incoming data. dictionary : pandas.DataFrame Data dictionary. required_variables : typing.Iterable, default=None Optional iterable of required variable names, defaults to ``None``. include_sections : typing.Iterable, default=["demog"] Optional iterable of names of sections to include, defaults to ``["demog"]``. include_types : typing.Iterable, default=["binary", "categorical", "numeric"] Optional iterable of variable type names, defaults to ``["binary", "categorical", "numeric"]``. exclude_suffixes : typing.Iterable, default=``["_units", "addi", "otherl2", "item", "_oth", "_unlisted", "otherl3"]`` Optional iterable of suffixes to exclude, defaults to ``["_units", "addi", "otherl2", "item", "_oth", "_unlisted", "otherl3"]``. include_subjid : bool, default=False Optional boolean to indicate whether to include subject ID, defaults to ``False``. Returns ------- list A list of all variables in the dataframe from specified sections and types, plus any required variables. """ # noqa : E501 if "section" in dictionary.columns: # TODO: this should always be True in future include_ind = dictionary["section"].isin(include_sections) else: include_ind = dictionary["field_name"].apply( lambda x: x.startswith(tuple(x + "_" for x in include_sections)) ) include_ind &= dictionary["field_type"].isin(include_types) # include_ind &= (dictionary['field_name'].apply( # lambda x: x.endswith(tuple('___' + x for x in exclude_suffix))) == 0) include_ind &= ( dictionary["field_name"].apply( lambda x: x.endswith(tuple(x for x in exclude_suffixes)) ) == 0 ) if isinstance(required_variables, list): include_ind |= dictionary["field_name"].isin(required_variables) if include_subjid: include_ind |= dictionary["field_name"] == "subjid" include_variables = dictionary.loc[include_ind, "field_name"].tolist() return [col for col in include_variables if col in data.columns]
[docs] def convert_categorical_to_onehot( data: pandas.DataFrame, dictionary: pandas.DataFrame, categorical_columns: typing.Iterable[str], sep: str = "___", missing_val: str = "nan", drop_first: bool = False, ) -> pandas.DataFrame: """:py:class:`pandas.DataFrame` : Returns the given dataframe with categorical variable columns converted to onehot-encoded variable columns. Parameters ---------- data : pandas.DataFrame The incoming data. dictionary : pandas.DataFrame The data dictionary. categorical_columns : typing.Iterable An iterable of categorical column names. sep : str, default="___" Field-value separator, defaults to ``"___"`` missing_val : str, default="nan" Optional value with which to replace missing values, defaults to ``"nan"``. drop_first : bool, default=False Optional boolean to indicate how to drop categorical columns [?], defaults to ``False``. Returns ------- pandas.DataFrame The original dataframe with the categorical -> one-hot-encoded variable columns. """ # noqa : E501 categorical_columns = [col for col in data.columns if col in categorical_columns] data.loc[:, categorical_columns] = data[categorical_columns].fillna(missing_val) data = pd.get_dummies(data, columns=categorical_columns, prefix_sep=sep) for categorical_column in categorical_columns: onehot_columns = [ var for var in data.columns if var.split(sep)[0] == categorical_column ] data[onehot_columns] = data[onehot_columns].astype(object) if (categorical_column + sep + missing_val) in data.columns: mask = data[categorical_column + sep + missing_val] == 1 data.loc[mask, onehot_columns] = np.nan data = data.drop(columns=[categorical_column + sep + missing_val]) elif ( drop_first and isinstance(dictionary, pd.DataFrame) and "field_name" in dictionary.columns ): drop_column_ind = dictionary.apply( lambda x: ( (x["parent"] == categorical_column) & (x["field_name"].split(sep)[0] == categorical_column) ), axis=1, ) if drop_column_ind.any(): data = data.drop( columns=[dictionary.loc[drop_column_ind, "field_name"].values[0]] ) if isinstance(dictionary, pd.DataFrame) and "field_name" in dictionary.columns: columns = [ col for col in dictionary["field_name"].values if col in data.columns ] columns += [ col for col in data.columns if col not in dictionary["field_name"].values ] data = data[columns] return data
[docs] def convert_onehot_to_categorical( data: pandas.DataFrame, dictionary: pandas.DataFrame, categorical_columns: typing.Iterable[str], sep: str = "___", missing_val: str = "nan", ) -> pandas.DataFrame: """:py:class:`pandas.DataFrame` : Returns the given dataframe with onehot-encoded variable columns to categorical variable columns. Parameters ---------- data : pandas.DataFrame The incoming data. dictionary : pandas.DataFrame Data dictionary. categorical_columns : typing.Iterable An iterable of categorical column names. sep : str, default="___" Optional field-value separator, defaults to ``"___"``. missing_val : str, default="nan" Optional value with which to replace missing values, defaults to ``"nan"``. Returns ------- pandas.DataFrame The original dataframe with the one-hot-encoded -> categorical variable columns. """ # noqa : E501 data = pd.concat([data, pd.DataFrame(columns=categorical_columns)], axis=1) for categorical_column in categorical_columns: onehot_columns = list( data.columns[data.columns.str.startswith(categorical_column + sep)] ) # Preserve missingness data.loc[:, categorical_column + sep + missing_val] = ( data[onehot_columns].any(axis=1) == 0 ) | (data[onehot_columns].isna().any(axis=1)) with pd.option_context("future.no_silent_downcasting", True): data.loc[:, onehot_columns] = data[onehot_columns].fillna(False) onehot_columns += [categorical_column + sep + missing_val] data.loc[:, categorical_column] = pd.from_dummies(data[onehot_columns], sep=sep) data = data.drop(columns=onehot_columns) columns = [col for col in dictionary["field_name"].values if col in data.columns] columns += [ col for col in data.columns if col not in dictionary["field_name"].values ] data = data[columns] return data
[docs] def from_timeA_to_timeB( data: pandas.DataFrame, dictionary: pandas.DataFrame, timeA_column: str, timeB_column: str, timediff_column: str, timediff_label: str, time_unit: str = "days", ) -> tuple[pd.DataFrame]: """:py:class:`tuple` : Returns the data and data dictionary updated with time difference calculation between two given data columns. Parameters ---------- data : pandas.DataFrame The incoming data. dictionary : pandas.DataFrame The data dictionary. timeA_column : str The name of the first time column. timeB_column : str The name of the second time column. timediff_column : str The name of the column for storing the time difference of the two given time columns. timediff_label : str A label to attach to the time difference column. time_unit : str, default="days" An optional indicator of which time unit to use for the time difference calculation, defaults to ``"days"``. Returns ------- tuple The data and data dictionary updated with time difference information. """ # noqa : E501 if time_unit == "days": data[timediff_column] = (data[timeB_column] - data[timeA_column]).dt.days elif time_unit == "months": data[timediff_column] = ( data[timeB_column].dt.year - data[timeA_column].dt.year ) * 12 + (data[timeB_column].dt.month - data[timeA_column].dt.month) elif time_unit == "months": data[timediff_column] = data[timeB_column].dt.year - data[timeA_column].dt.year time_dict = { "field_name": timediff_column, "form_name": dictionary.loc[ (dictionary["field_name"] == timeB_column), "field_name" ].values[0], "field_type": "numeric", "field_label": timediff_label, "parent": dictionary.loc[ (dictionary["field_name"] == timeB_column), "parent" ].values[0], } dictionary = extend_dictionary(dictionary, time_dict, data) return data, dictionary
############################################ ############################################ # Descriptive table ############################################ ############################################
[docs] def get_median_interquartile_range( series: pandas.Series, add_spaces: bool = False, dp: int = 1, mfw: int = 4, min_n: int = 3, ) -> str: """:py:class:`str` : Returns the median interquartile range (IQR) of a given series of values as a string. Parameters ---------- series : pandas.Series The input series for which to calculate the IQR. add_spaces : bool, default=False Add spacing in the string. dp : int, default=1 No description available. mfw : int, default=4 No description available. min_n : int, default=3 No description available. Returns ------- str The IQR as a string. """ # noqa : E501 if series.notna().sum() < min_n: output_str = "N/A" elif add_spaces: mfw_f = int(np.log10(max((series.quantile(0.75), 1)))) + 2 + dp output_str = "%*.*f" % (mfw_f, dp, series.median()) + " (" output_str += "%*.*f" % (mfw_f, dp, series.quantile(0.25)) + "-" output_str += "%*.*f" % (mfw_f, dp, series.quantile(0.75)) + ") | " output_str += "%*g" % (mfw, int(series.notna().sum())) else: output_str = "%.*f" % (dp, series.median()) + " (" output_str += "%.*f" % (dp, series.quantile(0.25)) + "-" output_str += "%.*f" % (dp, series.quantile(0.75)) + ") | " output_str += str(int(series.notna().sum())) return output_str
[docs] def median_iqr_str( series: pandas.Series, add_spaces: bool = False, dp: int = 1, mfw: int = 4, min_n: int = 3, ) -> str: """:py:class:`str` : Returns the median interquartile range (IQR) of a given series of values as a formatted string. .. warning:: DEPRECATED. Please use :py:func:`~isaricanalytics.analytics.get_median_interquartile_range` instead. Parameters ---------- series : pandas.Series The input series for which to calculate the IQR. add_spaces : bool, default=False Add spacing in the string. dp : int, default=1 No description available. mfw : int, default=4 No description available. min_n : int, default=3 No description available. Returns ------- str The IQR as a string. """ # noqa : E501 warnings.warn( ( "`analytics.median_iqr_str` is deprecated; " "please use `analytics.get_median_interquartile_range` instead." ), DeprecationWarning, stacklevel=2, ) return get_median_interquartile_range( series, add_spaces=add_spaces, dp=dp, mfw=mfw, min_n=min_n )
[docs] def get_mean_and_stdev( series: pandas.Series, add_spaces: bool = False, dp: int = 1, mfw: int = 4, min_n: int = 3, ) -> str: """:py:class:`str` : Returns the mean and standard deviation of a series of values as a formatted string. Parameters ---------- series : pandas.Series The input series for which to calculate the mean and st. dev. add_spaces : bool, default=False Add spacing in the string. dp : int, default=1 No description available. mfw : int, default=4 No description available. min_n : int, default=3 No description available. Returns ------- str The mean and st. dev. as a formatted string. """ # noqa : E501 if series.notna().sum() < min_n: output_str = "N/A" elif add_spaces: mfw_f = int(max((np.log10(series.mean(), 1)))) + 2 + dp output_str = "%*.*f" % (mfw_f, dp, series.mean()) + " (" output_str += "%*.*f" % (mfw_f, dp, series.std()) + ") | " output_str += "%*g" % (mfw, int(series.notna().sum())) else: output_str = "%.*f" % (dp, series.mean()) + " (" output_str += "%.*f" % (dp, series.std()) + ") | " output_str += str(int(series.notna().sum())) return output_str
[docs] def mean_std_str( series: pandas.Series, add_spaces: bool = False, dp: int = 1, mfw: int = 4, min_n: int = 3, ) -> str: """:py:class:`str` : Returns the mean and standard deviation of a series of values as a formatted string. .. warning:: DEPRECATED. Please use :py:func:`~isaricanalytics.analytics.get_mean_and_stdev` instead. Parameters ---------- series : pandas.Series The input series for which to calculate the mean and st. dev. add_spaces : bool, default=False Add spacing in the string. dp : int, default=1 No description available. mfw : int, default=4 No description available. min_n : int, default=3 No description available. Returns ------- str The mean and st. dev. as a formatted string. """ # noqa : E501 warnings.warn( ( "`analytics.mean_std_str` is deprecated; " "please use `analytics.get_mean_and_stdev` instead." ), DeprecationWarning, stacklevel=2, ) return get_mean_and_stdev( series, add_spaces=add_spaces, dp=dp, mfw=mfw, min_n=min_n )
[docs] def get_n_percent_value( series: pandas.Series, add_spaces: bool = False, dp: int = 1, mfw: int = 4, min_n: int = 1, ) -> str: """:py:class:`str` : Returns the n-percent value of a series as a string. Parameters ---------- series : pandas.Series The input series. add_spaces : bool, default=False Add spacing around the string. dp : int, default=1 No description available. mfw : int, default=1 No description available. min_n : int, default=1 No description available. Returns ------- str The n-percent value of the series as a string. """ # noqa : E501 if series.notna().sum() < min_n: output_str = "N/A" elif add_spaces: output_str = "%*g" % (mfw, int(series.sum())) + " (" percent = 100 * series.mean() if percent == 100: output_str += "100.0) | " else: output_str += "%5.*f" % (dp, percent) + ") | " output_str += "%*g" % (mfw, int(series.notna().sum())) else: count = str(int(series.sum())) percent = "%.*f" % (dp, 100 * series.mean()) denom = str(int(series.notna().sum())) output_str = f"{count} ({percent}) | {denom}" return output_str
[docs] def n_percent_str( series: pandas.Series, add_spaces: bool = False, dp: int = 1, mfw: int = 4, min_n: int = 1, ) -> str: """:py:class:`str` : Returns the n-percent value of a series as a string. .. warning:: DEPRECATED. Please use :py:func:`~isaricanalytics.analytics.get_n_percent_value` instead. Parameters ---------- series : pandas.Series The input series. add_spaces : bool, default=False Add spacing around the string. dp : int, default=1 No description available. mfw : int, default=1 No description available. min_n : int, default=1 No description available. Returns ------- str The n-percent value of the series as a string. """ # noqa : E501 warnings.warn( ( "`analytics.n_percent_str` is deprecated; " "please use `analytics.get_n_percent_value` instead." ), DeprecationWarning, stacklevel=2, ) return get_n_percent_value( series, add_spaces=add_spaces, dp=dp, mfw=mfw, min_n=min_n )
[docs] def get_chi2_pvalue( x: pandas.Series, y: pandas.Series, x_cat: typing.Iterable[typing.Any] = [True, False], y_cat: typing.Iterable[typing.Any] = [True, False], ) -> float: """:py:class:`float` : Returns the :math`p`-value for a Chi-squared test. Parameters ---------- x : pandas.Series The first series/factor. y : pandas.Series The second series/factor. x_cat : typing.Iterable An iterable of categories by which to group the first series. y_cat : typing.Iterable An interable of categories by which to group the second series. Returns ------- float The :math:`p`-value for the test. """ # noqa : E501 try: print(x.name) contingency = pd.crosstab( pd.Categorical(x.loc[x.notna()], categories=x_cat), pd.Categorical(y.loc[x.notna()], categories=y_cat), margins=False, dropna=False, ) pvalue = chi2_contingency(contingency).pvalue except ValueError: pvalue = np.nan return pvalue
[docs] def get_fisher_exact_pvalue( x: pandas.Series, y: pandas.Series, x_cat: typing.Iterable[typing.Any] = [True, False], y_cat: typing.Iterable[typing.Any] = [True, False], ): """:py:class:`float` : Returns the :math:`p`-value for a Fisher exact test. Parameters ---------- x : pandas.Series The first series/factor. y : pandas.Series The second series/factor. x_cat : typing.Iterable An iterable of categories by which to group the first series. y_cat : typing.Iterable An interable of categories by which to group the second series. Returns ------- float The :math:`p`-value for the test. """ # noqa : E501 try: contingency = pd.crosstab( pd.Categorical(x.loc[x.notna()], categories=x_cat), pd.Categorical(y.loc[x.notna()], categories=y_cat), margins=False, dropna=False, ) result = fisher_exact(contingency) if np.isinf(result.statistic) or np.isnan(result.statistic): pvalue = np.nan else: pvalue = result.pvalue except ValueError: pvalue = np.nan return pvalue
[docs] def format_pvalue( pvalue: float, dp: int = 3, min_val: float = 0.001, significance: dict[str, float] = {"*": 0.05, "**": 0.01}, ) -> str: """:py:class:`str` : Returns a formatted :math:`p`-value string. Parameters ---------- pvalue : float The :math:`p`-value. dp : int, default=3 Optional. No description available, defaults to :math:`3`. min_val : float, default=0.001 Optional. No description available, defaults to :math:`0.001`. significance : dict, default={"*": 0.05, "**": 0.01}. Dict of significance levels, defaults to ``{"*": 0.05, "**": 0.01}``. Returns ------- str The formatted :math:`p`-value string. """ # noqa : E501 if pvalue < min_val: pvalue_str = f"<{min_val}" elif np.isnan(pvalue): pvalue_str = "" else: pvalue_str = "%.*f" % (dp, pvalue) for key in significance: level = significance[key] min_threshold = max([v for k, v in significance.items() if (k != key)] + [0]) if min_threshold > level: min_threshold = 0 if (pvalue < level) & (pvalue > (min_threshold - 1e-8)): pvalue_str = pvalue_str + f" ({key})" return pvalue_str
[docs] def get_descriptive_data( data: pandas.DataFrame, dictionary: pandas.DataFrame, by_column: str | None = None, include_sections: typing.Iterable[str] = ["demog"], include_types: typing.Iterable[str] = ["binary", "categorical", "numeric"], exclude_suffix: typing.Iterable[str] = [ "_units", "addi", "otherl2", "item", "_oth", "_unlisted", "otherl3", ], include_subjid: bool = False, exclude_negatives: bool = True, sep: str = "___", ) -> pandas.DataFrame: """:py:class:`pandas.DataFrame` : Returns descriptive data. Parameters ---------- data : pandas.DataFrame Incoming data. dictionary : pandas.DataFrame Data dictionary. by_column : str, default=None Optional. No description available, defaults to ``None``. include_sections : typing.Iterable, default=["demog"] Optional list of names of sections to include, defaults to ``["demog"]``. include_types : typing.Iterable, default=["binary", "categorical", "numeric"] Optional iterable of variable type names, defaults to ``["binary", "categorical", "numeric"]``. exclude_suffix : typing.Iterable, default=["_units", "addi", "otherl2", "item", "_oth", "_unlisted", "otherl3"] Optional iterable of suffixes to exclude, defaults to ``["_units", "addi", "otherl2", "item", "_oth", "_unlisted", "otherl3"]``. include_subjid : bool, default=False Optional boolean to indicate whether to include subject ID, defaults to ``False``. exclude_negatives : bool, default=True Optional boolean to indicate whether to drop negatives, defaults to ``True``. sep : str, default="___" Optional field-value separator, defaults to ``"___"``. Returns ------- pandas.DataFrame Returns the descriptive data. """ # noqa : E501 _data = data.copy() include_columns = get_variables_by_section_and_type( _data, dictionary, include_types=include_types, include_subjid=include_subjid, include_sections=include_sections, exclude_suffix=exclude_suffix, ) if (by_column is not None) & (by_column not in include_columns): include_columns = [by_column] + include_columns _data = _data[include_columns].dropna(axis=1, how="all").copy() # Convert categorical variables to onehot-encoded binary columns categorical_ind = dictionary["field_type"] == "categorical" columns = dictionary.loc[categorical_ind, "field_name"].tolist() columns = [col for col in columns if col != by_column] _data = convert_categorical_to_onehot( _data, dictionary, categorical_columns=columns, sep=sep ) if (by_column is not None) & (by_column not in _data.columns): _data = convert_onehot_to_categorical( _data, dictionary, categorical_columns=[by_column], sep=sep ) negative_values = ("no", "never smoked") negative_columns = [ col for col in _data.columns if col.split(sep)[-1].lower() in negative_values ] if exclude_negatives: _data.drop(columns=negative_columns, inplace=True) # Remove columns with only NaN values _data = _data.dropna(axis=1, how="all") _data.fillna({by_column: "Unknown"}, inplace=True) return _data
[docs] def descriptive_table( data: pandas.DataFrame, dictionary: pandas.DataFrame, by_column: str | None = None, include_totals: bool = True, column_reorder: typing.Iterable[str] | None = None, include_raw_variable_name: bool = False, sep: str = "___", ) -> tuple[pandas.DataFrame, str]: """:py:class:`tuple` : Returns the descriptive table and table key for binary and numerical variables in the data. The descriptive table will have separate columns for each category that exists for the ``by_column`` variable, if this is provided. Parameters ---------- data : pandas.DataFrame Incoming data. dictionary : pandas.DataFrame Data dictionary. by_column : str, default=None Optional. No description available, defaults to ``None``. include_totals : bool, default=None Optional. No description available, defaults to ``None``. column_reorder : typing.Iterable, default=None Optional iterable of names of columns to reorder by, defaults to ``None``. include_raw_variable_name : bool, default=False Optional boolean indicating whether to include the raw variable name, defaults to ``False``. sep : str, default="___" Optional field-value separator, defaults to ``"___"``. Returns ------- tuple Returns the descriptive table and table key for binary and numerical variables in the data. """ # noqa : E501 _data = data.copy() numeric_ind = dictionary["field_type"] == "numeric" numeric_columns = dictionary.loc[numeric_ind, "field_name"].tolist() numeric_columns = [col for col in numeric_columns if col in _data.columns] binary_ind = dictionary["field_type"] == "binary" binary_columns = dictionary.loc[binary_ind, "field_name"].tolist() binary_columns = [col for col in binary_columns if col in _data.columns] # Add columns for section headers and categorical questions index = numeric_columns + binary_columns index += dictionary.loc[(dictionary["field_name"].isin(index)), "parent"].tolist() table_dictionary = dictionary.loc[(dictionary["field_name"].isin(index))] index = table_dictionary["field_name"].tolist() table_columns = ["Variable", "All"] if by_column is not None: add_columns = list(_data[by_column].unique()) if column_reorder is not None: table_columns += [col for col in column_reorder if col in add_columns] table_columns += [col for col in add_columns if col not in column_reorder] else: table_columns += add_columns table_columns += ["Raw variable name"] table = pd.DataFrame("", index=index, columns=table_columns) table["Raw variable name"] = [var if var in _data.columns else "" for var in index] table["Variable"] = format_descriptive_table_variables( table_dictionary, sep=sep ).tolist() mfw = int(np.log10(_data.shape[0])) + 1 # Min field width, for formatting table.loc[numeric_columns, "All"] = _data[numeric_columns].apply( median_iqr_str, mfw=mfw ) table.loc[binary_columns, "All"] = _data[binary_columns].apply( n_percent_str, mfw=mfw ) totals = pd.DataFrame(columns=table_columns, index=["totals"]) totals["Variable"] = "<b>Totals</b>" totals["All"] = _data.shape[0] if by_column is not None: choices_values = _data[by_column].unique() for value in choices_values: ind = _data[by_column] == value mfw = int(np.log10(ind.sum())) + 1 # Min field width, for format table.loc[numeric_columns, value] = _data.loc[ind, numeric_columns].apply( median_iqr_str, mfw=mfw ) table.loc[binary_columns, value] = _data.loc[ind, binary_columns].apply( n_percent_str, mfw=mfw ) totals[value] = ind.sum() table = table.reset_index(drop=True) if include_totals: table = pd.concat([totals, table], axis=0).reset_index(drop=True) table_key = "<b>KEY</b><br>(*) Count (%) | N<br>(+) Median (IQR) | N" if include_raw_variable_name is False: table.drop(columns=["Raw variable name"], inplace=True) return table, table_key
[docs] def descriptive_comparison_table( data: pandas.DataFrame, dictionary: pandas.DataFrame, by_column: str | None = None, include_totals: bool = True, column_reorder: typing.Iterable[str] | None = None, sep: str = "___", pvalue_significance: dict[str, float] = {"*": 0.05, "**": 0.01}, ) -> tuple[pd.DataFrame, str]: """:py:class:`tuple` : Returns the descriptive comparison table and table key for binary (including onehot-encoded categorical) and numerical variables in data. The descriptive table will have separate columns for each category that exists for the ``by_column`` variable, if this is provided. Parameters ---------- data : pandas.DataFrame Incoming data. dictionary : pandas.DataFrame Data dictionary. by_column : str, default=None Optional. No description available, defaults to ``None``. include_totals : bool, default=None Optional. No description available, defaults to ``None``. column_reorder : typing.Iterable, default=None Optional iterable of names of columns to reorder by, defaults to ``None``. sep : str, default="___" Optional field-value separator, defaults to ``"___"``. pvalue_significance : dict, default={"*": 0.05, "**": 0.01} A dict of significance levels, defaults to ``{"*": 0.05, "**": 0.01}``. Returns ------- tuple Returns the descriptive comparison table and table key for binary and numerical variables in the data. """ # noqa : E501 _data = data.copy() numeric_ind = dictionary["field_type"] == "numeric" numeric_columns = dictionary.loc[numeric_ind, "field_name"].tolist() numeric_columns = [col for col in numeric_columns if col in _data.columns] binary_ind = dictionary["field_type"] == "binary" binary_columns = dictionary.loc[binary_ind, "field_name"].tolist() binary_columns = [col for col in binary_columns if col in _data.columns] # Add columns for section headers and categorical questions index = numeric_columns + binary_columns index += dictionary.loc[(dictionary["field_name"].isin(index)), "parent"].tolist() table_dictionary = dictionary.loc[(dictionary["field_name"].isin(index))] index = table_dictionary["field_name"].tolist() table_columns = ["Variable", "All"] if by_column is not None: add_columns = list(_data[by_column].unique()) if column_reorder is not None: table_columns += [col for col in column_reorder if col in add_columns] table_columns += [col for col in add_columns if col not in column_reorder] else: table_columns += add_columns table_columns += ["p-value"] table = pd.DataFrame("", index=index, columns=table_columns) table["Variable"] = format_descriptive_table_variables( table_dictionary, sep=sep, binary_symbol="†" ).tolist() mfw = int(np.log10(_data.shape[0])) + 1 # Min field width, for formatting table.loc[numeric_columns, "All"] = _data[numeric_columns].apply( median_iqr_str, mfw=mfw ) table.loc[binary_columns, "All"] = _data[binary_columns].apply( n_percent_str, mfw=mfw ) totals = pd.DataFrame(columns=table_columns, index=["totals"]) totals["Variable"] = "<b>Totals</b>" totals["All"] = _data.shape[0] if by_column is not None: by_column_values = _data[by_column].unique() for value in by_column_values: ind = _data[by_column] == value mfw = int(np.log10(ind.sum())) + 1 # Min field width, for format table.loc[numeric_columns, value] = _data.loc[ind, numeric_columns].apply( median_iqr_str, mfw=mfw ) table.loc[binary_columns, value] = _data.loc[ind, binary_columns].apply( n_percent_str, mfw=mfw ) totals[value] = ind.sum() if len(by_column_values) == 2: y = _data[by_column] == by_column_values[0] pvalues = pd.Series(index=table.index) pvalues.loc[numeric_columns] = _data[numeric_columns].apply( lambda x: mannwhitneyu(x, y, nan_policy="omit").pvalue, axis=0 ) pvalues.loc[binary_columns] = _data[binary_columns].apply( lambda x: get_fisher_exact_pvalue(x, y), axis=0 ) table["p-value"] = pvalues.apply( format_pvalue, significance=pvalue_significance ) table = table.reset_index(drop=True) if include_totals: table = pd.concat([totals, table], axis=0).reset_index(drop=True) table_key = """<b>KEY</b><br>(†) Count (%) | N, p-value using Chi-squared test, (+) Median (IQR) | N, p-value using Mann-Whitney test<br>""" table_key += ", ".join( [ f"({k}) p < {str(v)}" for k, v in pvalue_significance.items() if (pvalues < v).any() ] ) return table, table_key
############################################ ############################################ # Formatting ############################################ ############################################
[docs] def trim_field_label(x: str, max_len: int = 40) -> str: """:py:class:`str` : Trims field label using an optional max. length parameter that defaults to 40 characters. Parameters ---------- x : str The input field label. max_len : int, default=40 An optional maximum length parameter to use for trimming, defaults to :math:`40`. Returns ------- str The trimmed field label. """ # noqa : E501 if len(x) > max_len: x = " ".join(x[:max_len].split(" ")[:-1]) + " ..." return x
[docs] def format_descriptive_table_variables( dictionary: pandas.DataFrame, max_len: int = 100, add_key: bool = True, sep: str = "___", binary_symbol: str = "*", numeric_symbol: str = "+", ) -> str: """:py:class:`str` : Returns a formatted string of the descriptive table variable field names. Parameters ---------- dictionary : pandas.DataFrame The data dictionary. max_len : int, default=100 Optional maximum length of field names, defaults to :math:`100`. add_key : bool, default=True Optional. No description available, defaults to ``True``. sep : str, default="___" Optional field-value separator, defaults to ``"___"``. binary_symbol : str, default="*" Optional. No description available, defaults to ``"*"``. numeric_symbol : str, default="*" Optional. No description available, defaults to ``"+"``. Returns ------- str A formatted string of the descriptive table variable field names. """ # noqa : E501 name = dictionary["field_name"].apply(lambda x: " ↳ " if sep in x else "<b>") name += dictionary["field_type"].map({"section": "<i>"}).fillna("") name += ( dictionary["field_label"] .apply(lambda x: x.split(":")[-1] if x.startswith("If") else x) .apply(trim_field_label, max_len=max_len) ) name += dictionary["field_type"].map({"section": "</i>"}).fillna("") name += dictionary["field_name"].apply(lambda x: "" if sep in x else "</b>") if add_key is True: field_type = ( dictionary["field_type"] .map( { "categorical": f" ({binary_symbol})", "binary": f" ({binary_symbol})", "numeric": f" ({numeric_symbol})", } ) .fillna("") ) name += field_type * (dictionary["field_name"].str.contains(sep) == 0) return name
[docs] def format_variables( dictionary: pandas.DataFrame, max_len: int = 40, sep: str = "___" ) -> str: """:py:class:`str` : Returns a formatted string of the descriptive table variable field names. Parameters ---------- dictionary : pandas.DataFrame The data dictionary. max_len : int, default=40 Optional maximum length of field names, defaults to :math:`40`. sep : str, default="___" Optional field-value separator, defaults to ``"___"``. Returns ------- str A formatted string of the descriptive table variable field names. """ # noqa : E501 def get_parent_label(x): if pd.isna(x) or not isinstance(x, str): return "" match = dictionary.loc[dictionary["field_name"] == x, "field_label"] return match.iloc[0] if not match.empty else "" parent_label = dictionary["parent"].apply(get_parent_label) parent_name = parent_label.apply(trim_field_label, max_len=max_len) name = ( dictionary["field_label"] .apply(lambda x: x.split(":")[-1] if x.startswith("If") else x) .apply(trim_field_label, max_len=max_len) ) answer_ind = dictionary["field_name"].str.contains(sep) name = ("<b>" + parent_name + "</b>, " + name) * answer_ind + ( "<b>" + name + "</b>" ) * (~answer_ind) return name
############################################ ############################################ # Counts ############################################ ############################################
[docs] def get_counts( data: pandas.DataFrame, dictionary: pandas.DataFrame, max_n_variables: int = 10, sep: str = "___", ) -> pandas.DataFrame: """:py:class:`pandas.DataFrame` : Returns a dataframe of variable column counts. Parameters ---------- data : pandas.DataFrame The incoming data. dictionary : pandas.DataFrame The data dictionary. max_n_variables : int, default=10 Optional number of variables for which to take counts, defaults to :math:`10`. sep : str, default="___" Optional field-value separator, defaults to ``"___"``. Returns ------- The variable column counts dataframe. """ # noqa : E501 counts = data.apply(lambda x: x.sum()).T.reset_index() counts.columns = ["variable", "count"] counts = counts.sort_values(by=["count"], ascending=False).reset_index(drop=True) if counts.shape[0] > max_n_variables: counts = counts.head(max_n_variables) short_format = format_variables(dictionary, max_len=40, sep=sep) long_format = format_variables(dictionary, max_len=1000, sep=sep) format_dict = dict(zip(dictionary["field_name"], long_format)) short_format_dict = dict(zip(dictionary["field_name"], short_format)) counts["label"] = counts["variable"].map(format_dict) counts["short_label"] = counts["variable"].map(short_format_dict) return counts
[docs] def get_proportions( data: pandas.DataFrame, dictionary: pandas.DataFrame, max_n_variables: int = 10, ignore_branching_logic: bool = False, branching_logic: str = "", sep: str = "___", ) -> pandas.DataFrame: """:py:class:`pandas.DataFrame` : Returns a dataframe of proportional counts for variable columns. Parameters ---------- data : pandas.DataFrame The incoming data. dictionary : pandas.DataFrame The data dictionary. max_n_variables : int, default=10 Optional number of variables for which to take counts, defaults to :math:`10`. sep : str, default="___" Optional field-value separator, defaults to ``"___"``. Returns ------- pandas.DataFrame A dataframe of proportional counts for variable columns. """ # noqa : E501 if ignore_branching_logic: columns = [col for col in data.columns] else: dictionary_subset = dictionary.loc[ (dictionary["branching_logic"] == branching_logic) ] columns = [ col for col in data.columns if col in dictionary_subset["field_name"].values ] if len(columns) == 0: branching_logic = dictionary.loc[ dictionary["field_name"].isin(data.columns), "branching_logic" ] branching_logic = branching_logic.values[0] dictionary_subset = dictionary.loc[ (dictionary["branching_logic"] == branching_logic) ] columns = [ col for col in data.columns if col in dictionary_subset["field_name"].values ] proportions = ( data[columns].apply(lambda x: (x.sum() / x.count(), x.sum())).T.reset_index() ) proportions.columns = ["variable", "proportion", "count"] proportions = proportions.sort_values(by=["count"], ascending=False).reset_index( drop=True ) if proportions.shape[0] > max_n_variables: proportions = proportions.head(max_n_variables) proportions = proportions.drop(columns="count") proportions = proportions.sort_values( by=["proportion"], ascending=False ).reset_index(drop=True) short_format = format_variables(dictionary, max_len=40, sep=sep) long_format = format_variables(dictionary, max_len=1000, sep=sep) format_dict = dict(zip(dictionary["field_name"], long_format)) short_format_dict = dict(zip(dictionary["field_name"], short_format)) proportions["label"] = proportions["variable"].map(format_dict) proportions["short_label"] = proportions["variable"].map(short_format_dict) return proportions
[docs] def get_upset_counts_intersections( data: pandas.DataFrame, dictionary: pandas.DataFrame, variables: list[str] | None = None, n_variables: int = 5, sep: str = "___", ) -> tuple[pandas.DataFrame]: """:py:class:`pandas.DataFrame` : Returns a dataframe of upset counts intersections. Parameters ---------- data : pandas.DataFrame The incoming data. dictionary : pandas.DataFrame The data dictionary. variables : list, default=None Optional list of names of variable columns for which to take counts, defaults to ``None``. n_variables : int, default=5 Optional limit for the number of variable columns, defaults to :math:`5`. sep : str, default="___" Optional field-value separator, defaults to ``"___"``. Returns ------- pandas.DataFrame A dataframe of upset counts intersections. """ # noqa : E501 # Convert variables and column names into their formatted names long_format = format_variables(dictionary, max_len=1000, sep=sep) short_format = format_variables(dictionary, max_len=40, sep=sep) format_dict = dict(zip(dictionary["field_name"], long_format)) short_format_dict = dict(zip(dictionary["field_name"], short_format)) if variables is None: variables = data.columns.tolist() binary_columns = dictionary.loc[ (dictionary["field_type"] == "binary"), "field_name" ].tolist() variables = [col for col in variables if col in binary_columns] variables = [var for var in variables if data[var].sum() > 0] data = data[variables].astype(float).fillna(0) counts = data.sum().astype(int).reset_index().rename(columns={0: "count"}) counts = counts.sort_values(by="count", ascending=False).reset_index(drop=True) counts["short_label"] = counts["index"].map(short_format_dict) counts["label"] = counts["index"].map(format_dict) variable_order_dict = dict(zip(counts["index"], counts.index)) variables = counts["index"].tolist() if n_variables is not None: variables = variables[:n_variables] data = data[variables] counts = counts.loc[counts["index"].isin(variables)] intersections = data.loc[data.any(axis=1)].value_counts().reset_index() intersections["index"] = intersections.drop(columns="count").apply( lambda x: tuple(col for col in x.index if x[col] == 1), axis=1 ) intersections["label"] = intersections["index"].apply( lambda x: tuple(format_dict[y] for y in x) ) # The rest is reordering to make it look prettier intersections = intersections.loc[(intersections["count"] > 0)] intersections["index_n"] = intersections["index"].apply(len) intersections["index_first"] = ( intersections[variables].idxmax(axis=1).map(variable_order_dict) ) intersections["index_last"] = ( intersections[variables].idxmin(axis=1).map(variable_order_dict) ) intersections = intersections.sort_values( by=["count", "index_first", "index_last", "index_n"], ascending=[False, True, False, False], ) keep_columns = ["index", "label", "count"] intersections = intersections[keep_columns].reset_index(drop=True) return counts, intersections
[docs] def get_pyramid_data( data: pandas.DataFrame, column_dict: dict[str, str], left_side: str = "Female", right_side: str = "Male", ) -> pandas.DataFrame: """:py:class:`pandas.DataFrame` : Returns dual stack pyramid data. Parameters ---------- data : pandas.DataFrame The incoming data. column_dict : dict Dict of pyramid keys and data column names. left_side : str, default="Female" Optional label for the left side of the pyramid, defaults to ``"Female"``. right_side : str, default="Male" Optional label for the right side of the pyramid, defaults to ``"Male"``. Returns ------- pandas.DataFrame Dual stack pyramid data. """ keys = ["side", "y_axis", "stack_group"] # assert all(key in tuple(column_dict.keys()) for key in keys), 'Error' columns = [column_dict[key] for key in keys] df_pyramid = data[["subjid"] + columns].copy() df_pyramid = df_pyramid.groupby(columns, observed=True).count().reset_index() df_pyramid.rename(columns={"subjid": "value"}, inplace=True) df_pyramid.rename(columns={v: k for k, v in column_dict.items()}, inplace=True) df_pyramid = df_pyramid.loc[df_pyramid["side"].isin([left_side, right_side])] df_pyramid.loc[:, "left_side"] = df_pyramid["side"] == left_side df_pyramid = df_pyramid.sort_values(by="y_axis").reset_index(drop=True) return df_pyramid
############################################ ############################################ # Logistic Regression from Risk Factors ############################################ ############################################
[docs] def get_modelling_data( data: pandas.DataFrame, dictionary: pandas.DataFrame, outcome_columns: str | typing.Iterable[str], include_sections: typing.Iterable[str] = [ "demog", "comor", "adsym", "vacci", "vital", "sympt", "labs", ], required_variables: typing.Iterable[str] | None = None, include_types: typing.Iterable[str] = ["binary", "categorical", "numeric"], exclude_suffix: typing.Iterable[str] = [ "_units", "addi", "otherl2", "item", "_oth", "_unlisted", "otherl3", ], include_subjid: bool = False, exclude_negatives: bool = True, fillna: bool = True, drop_first: bool = False, sep: str = "___", ) -> pandas.DataFrame: """:py:class:`pandas.DataFrame` : Returns modelling data. Parameters ---------- data : pandas.DataFrame Incoming data. dictionary : pandas.DataFrame Data dictionary. outcome_columns : typing.Iterable Outcome columns. include_sections : typing.Iterable, default=["demog", "comor", "adsym", "vacci", "vital", "sympt", "labs"] Optional list of names of sections to include, defaults to ``["demog", "comor", "adsym", "vacci", "vital", "sympt", "labs"]``. required_variables: typing.Iterable, default=None Required variable column names, defaults to ``None``. include_types : typing.Iterable, default=["binary", "categorical", "numeric"] Optional iterable of variable type names, defaults to ``["binary", "categorical", "numeric"]``. exclude_suffix : typing.Iterable, default=["_units", "addi", "otherl2", "item", "_oth", "_unlisted", "otherl3"] Optional iterable of suffixes to exclude, defaults to ``["_units", "addi", "otherl2", "item", "_oth", "_unlisted", "otherl3"]``. include_subjid : bool, default=False Optional boolean to indicate whether to include subject ID, defaults to ``False``. exclude_negatives : bool, default=True Optional boolean to indicate whether to drop negatives, defaults to ``True``. fillna : bool, default=True Optional boolean to fill nulls, defaults to ``True``. drop_first : bool, default=False Optional boolean relating to dropping columns, defaults to ``False``. sep : str, default="___" Optional field-value separator, defaults to ``"___"``. Returns ------- pandas.DataFrame Returns the modelling data. """ # noqa : E501 _data = data.copy() if isinstance(outcome_columns, str): outcome_columns = [outcome_columns] include_columns = get_variables_by_section_and_type( _data, dictionary, required_variables=required_variables, include_types=include_types, include_subjid=include_subjid, include_sections=include_sections, exclude_suffix=exclude_suffix, ) for outcome_column in outcome_columns: if outcome_column not in include_columns: include_columns = [outcome_column] + include_columns _data = _data[include_columns].dropna(axis=1, how="all").copy() # Convert categorical variables to onehot-encoded binary columns categorical_ind = dictionary["field_type"] == "categorical" columns = dictionary.loc[categorical_ind, "field_name"].tolist() columns = [col for col in columns if col not in tuple(outcome_columns)] _data = convert_categorical_to_onehot( _data, dictionary, categorical_columns=columns, drop_first=drop_first, sep=sep ) binary_ind = dictionary["field_type"] == "binary" columns = dictionary.loc[binary_ind, "field_name"].tolist() columns = [col for col in columns if col in _data.columns] if fillna is True: with pd.option_context("future.no_silent_downcasting", True): _data[columns] = _data[columns].fillna(False) negative_values = ("no", "never smoked") negative_columns = [ col for col in _data.columns if col.split(sep)[-1].lower() in negative_values ] if exclude_negatives: _data.drop(columns=negative_columns, inplace=True) return _data
[docs] def variance_influence_factor_backwards_elimination( data: pandas.DataFrame, dictionary: pandas.DataFrame, predictors_list: typing.Iterable[str], sep: str = "___", ) -> tuple[typing.Iterable[str], pandas.DataFrame]: """:py:class:`tuple` : Returns an iterable of retained columns and the VIF backwards elimination data. Parameters ---------- data : pandas.DataFrame Incoming data. dictionary : pandas.DataFrame Data dictionary. predictors_list : typing.Iterable Iterable of predictor variable column names. sep : str, default="___" Optional field-value separator, defaults to ``"___"``. Returns ------- tuple An iterable of retained columns and the VIF backwards elimination data. """ # noqa : E501 _data = data.copy() numeric_ind = dictionary["field_type"] == "numeric" numeric_columns = dictionary.loc[numeric_ind, "field_name"].tolist() numeric_columns = [col for col in numeric_columns if col in _data.columns] numeric_columns = [col for col in numeric_columns if col in predictors_list] categorical_ind = dictionary["field_type"].isin(["binary"]) categorical_columns = dictionary.loc[categorical_ind, "field_name"].tolist() categorical_columns = [col for col in categorical_columns if col in _data.columns] categorical_columns = [col for col in categorical_columns if col in predictors_list] _data[numeric_columns] = ( _data[numeric_columns] - _data[numeric_columns].mean() ) / _data[numeric_columns].std() _data = 1.0 * pd.concat( [_data[numeric_columns], _data[categorical_columns]], axis=1 ).astype(float) keep_columns = _data.columns iterative_vif = pd.DataFrame(keep_columns, columns=["variable"]) with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=RuntimeWarning) iterative_vif["vif_iter_1"] = [ variance_inflation_factor(_data[keep_columns].values, ii) for ii in range(len(keep_columns)) ] n = 1 while (iterative_vif["vif_iter_" + str(n)] > 10).any(): remove_column = iterative_vif.loc[ iterative_vif["vif_iter_" + str(n)].idxmax(), "variable" ] remove_column = remove_column.split(sep)[0] keep_columns = [ col for col in keep_columns if (col.split(sep)[0] != remove_column) ] n += 1 vif = pd.DataFrame(keep_columns, columns=["variable"]) with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=RuntimeWarning) vif["vif_iter_" + str(n)] = [ variance_inflation_factor(_data[keep_columns].values, ii) for ii in range(len(keep_columns)) ] iterative_vif = pd.merge(iterative_vif, vif, how="left", on="variable") return keep_columns, iterative_vif
[docs] def remove_single_binary_outcome_predictors( data: pandas.DataFrame, dictionary: pandas.DataFrame, predictors: typing.Iterable[str], outcome: str, ) -> typing.Iterable[str]: """:py:class:`typing.Iterable` : Returns a list of retained columns in the data after removing single binary outcome predictor columns. Removes binary predictors that are associated with only one outcome, e.g. if all patients with ``some_variable=1`` have ``outcome=1``. Parameters ---------- data: pandas.DataFrame The incoming data. dictionary: pandas.DataFrame The data dictionary. predictors: typing.Iterable Iterable of predictor variable column names. outcome : str The outcome string. Returns ------- typing.Iterable: List of predictor variable column names excluding any that can't be used in the logistic regression model. """ # noqa : E501 _data = data.copy() numeric_ind = dictionary["field_type"] == "numeric" numeric_columns = dictionary.loc[numeric_ind, "field_name"].tolist() numeric_columns = [col for col in numeric_columns if col in _data.columns] numeric_columns = [col for col in numeric_columns if col in predictors] categorical_ind = dictionary["field_type"].isin(["binary"]) categorical_columns = dictionary.loc[categorical_ind, "field_name"].tolist() categorical_columns = [col for col in categorical_columns if col in data.columns] categorical_columns = [col for col in categorical_columns if col in predictors] result = ( _data.groupby(outcome)[categorical_columns] .apply( lambda x: x.isin([True]).any() & x.isin([False]).any(), include_groups=False ) .T ) keep_columns = result.loc[result.all(axis=1)].index.tolist() keep_columns = keep_columns + numeric_columns return keep_columns
[docs] def regression_summary_table( table: pandas.DataFrame, dictionary: pandas.DataFrame, highlight_predictors: dict[str, typing.Iterable[str]] | None = None, pvalue_significance: float | None = None, result_type: str = "OddsRatio", sep: str = "___", ) -> pandas.DataFrame: """:py:class:`pandas.DataFrame` : Returns a regression summary table. Parameters ---------- table : pandas.DataFrame The incoming table. dictionary : pandas.DataFrame The data dictionary. highlight_predictors : dict, default=None Optional. No description available, defaults to ``None``. pvalue_significance : int, default=5 Optional :math:`p`-value significance level, defaults to ``None``. result_type : str, default="OddsRatio" Optional. No description avaiable, defaults to ``"OddsRatio"``. sep : str, default="___" Optional field-value separator, defaults to ``"___"``. Returns ------- pandas.DataFrame Regression summary table. """ # noqa : E501 # Convert variables and column names into their formatted names variables = table["Variable"].tolist() new_variables = ( variables + dictionary.loc[ ( (dictionary["field_name"].isin(variables)) & (dictionary["parent"].isin([""]) == 0) ), "parent", ].tolist() ) new_variables = list(set(new_variables)) while len(variables) != len(new_variables): variables = new_variables new_variables = ( variables + dictionary.loc[ ( (dictionary["field_name"].isin(variables)) & (dictionary["parent"].isin([""]) == 0) ), "parent", ].tolist() ) new_variables = list(set(new_variables)) # Reorders the variables dictionary = dictionary.set_index("field_name") nonrepeated_parent = ( dictionary.loc[variables] .groupby("parent") .apply(len, include_groups=False) .eq(1) ) nonrepeated_parent = [ p for p in nonrepeated_parent.loc[nonrepeated_parent].index if (dictionary.loc[p, "field_type"] != "section") ] variables = [var for var in variables if var not in nonrepeated_parent] dictionary = dictionary.reset_index() variables = dictionary.loc[ (dictionary["field_name"].isin(variables)), "field_name" ].tolist() for reg_type in ["multi", "uni"]: table[f"{result_type} ({reg_type})"] = table.apply( lambda x: "%.2f" % x[f"{result_type} ({reg_type})"] + " (" + "%.2f" % x[f"LowerCI ({reg_type})"] + ", " + "%.2f" % x[f"UpperCI ({reg_type})"] + ")", axis=1, ) table[f"p-value ({reg_type})"] = table[f"p-value ({reg_type})"].apply( format_pvalue, significance=pvalue_significance ) table = table.drop(columns=[f"LowerCI ({reg_type})", f"UpperCI ({reg_type})"]) add_variables = [var for var in variables if var not in table["Variable"].tolist()] add_table = pd.DataFrame("", columns=table.columns, index=range(len(add_variables))) add_table["Variable"] = add_variables table = pd.concat([table, add_table], axis=0) table = table.set_index("Variable").loc[variables].reset_index() add_key = pd.Series("", index=table.index) if highlight_predictors is not None: for key in highlight_predictors: add_key.loc[table["Variable"].isin(highlight_predictors[key])] += key add_key = add_key.apply(lambda x: x if x == "" else f" ({x})") formatted_labels_v1 = format_descriptive_table_variables( dictionary, add_key=False, sep=sep ) formatted_labels_v2 = format_variables(dictionary, sep=sep) v1_ind = dictionary["parent"].isin(variables) v2_ind = dictionary["parent"].isin(variables) == 0 mapping_dict = { **dict( zip(dictionary.loc[v1_ind, "field_name"], formatted_labels_v1.loc[v1_ind]) ), **dict( zip(dictionary.loc[v2_ind, "field_name"], formatted_labels_v2.loc[v2_ind]) ), } table["Variable"] = table["Variable"].map(mapping_dict) table["Variable"] = table["Variable"] + add_key return table
[docs] def execute_glmm_regression( elr_dataframe_df: pandas.DataFrame, elr_outcome: str, elr_predictors: typing.Iterable[str], elr_groups: str, model_type: str = "linear", print_results: bool = True, labels: dict[str, str] | None = None, reg_type: str = "multi", ) -> pandas.DataFrame: """:py:class:`pandas.DataFrame` : Executes a mixed effects model for linear or logistic regression. Parameters ---------- elr_dataframe_df : pandas.DataFrame The incoming data. elr_outcome : str Name of the response variable. elr_predictors : typing.Iterable Iterable of predictor variable names. elr_groups : str Name of the variable that defines the groups (random effect). model_type : str, default="linear" Optional regression model type - use ``"linear"`` for linear regression or ``"logistic"`` for logistic regression; defaults to ``"linear"``. print_results : bool, default=True Optional indicator of whether to print the results summary, defaults to ``True``. labels : dict, default=None Optional map of variable names to readable labels, defaults to ``None``. reg_type : str, default="multi" Optional regression type - ``"uni"`` for univariate, ``"multi"`` for multivariate. Defaults to ``"multi"``. Returns ------- pandas.DataFrame The model results. """ # noqa : E501 # Builds the formula elr_formula_str = elr_outcome + " ~ " + " + ".join(elr_predictors) # Converts predictor categorical variables elr_categorical_vars_list = elr_dataframe_df.select_dtypes( include=["object", "category"] ) elr_categorical_vars_list = elr_categorical_vars_list.columns.intersection( elr_predictors ) for elr_var_str in elr_categorical_vars_list: elr_dataframe_df[elr_var_str] = elr_dataframe_df[elr_var_str].astype("category") # Converts the groups column to string to ensure that the values are hashable elr_dataframe_df[elr_groups] = elr_dataframe_df[elr_groups].astype(str) if model_type.lower() == "linear": # Mixed linear model using MixedLM (following your function) elr_model_obj = smf.mixedlm( formula=elr_formula_str, data=elr_dataframe_df, groups=elr_dataframe_df[elr_groups], ) elr_result_obj = elr_model_obj.fit() fixed_effects = elr_result_obj.fe_params conf_int_df = elr_result_obj.conf_int().loc[fixed_effects.index] pvalues = elr_result_obj.pvalues.loc[fixed_effects.index] elr_summary_df = pd.DataFrame( { "Study": fixed_effects.index, "Coef": fixed_effects.values, "IC Low": conf_int_df.iloc[:, 0].values, "IC High": conf_int_df.iloc[:, 1].values, "p-value": pvalues.values, } ) elif model_type.lower() == "logistic": # Mixed logistic model using BinomialBayesMixedGLM (Bayesian approach via VB) # Defines vc_formula for random effect (random intercept per group) vc_formula = {elr_groups: "0 + C({})".format(elr_groups)} elr_model_obj = BinomialBayesMixedGLM.from_formula( formula=elr_formula_str, vc_formulas=vc_formula, data=elr_dataframe_df ) elr_result_obj = elr_model_obj.fit_vb() # Extracts the fixed effect names and determines how many there are param_names = elr_model_obj.exog_names n_fixed = len(param_names) fixed_effects = pd.Series(elr_result_obj.params[:n_fixed], index=param_names) # Attempts to obtain the covariance matrix and extracts the slice # corresponding to fixed effects try: cov_params = elr_result_obj.cov_params() except Exception: try: cov_params = elr_result_obj.vcov except Exception: cov_params = None if cov_params is not None: # If it is a DataFrame, use .iloc; otherwise, assume a NumPy array if hasattr(cov_params, "iloc"): cov_params_fixed = cov_params.iloc[:n_fixed, :n_fixed] else: cov_params_fixed = cov_params[:n_fixed, :n_fixed] bse = np.sqrt(np.diag(cov_params_fixed)) bse = pd.Series(bse, index=param_names) # Calculates p-values manually (Wald test, normal approximation) z_values = fixed_effects / bse pvalues = 2 * (1 - norm.cdf(np.abs(z_values))) pvalues = pd.Series(pvalues, index=param_names) else: bse = pd.Series(np.full(fixed_effects.shape, np.nan), index=param_names) pvalues = pd.Series(np.full(fixed_effects.shape, np.nan), index=param_names) # Calculates confidence intervals using 1.96 as the quantile of the # normal distribution lower_ci = fixed_effects - 1.96 * bse upper_ci = fixed_effects + 1.96 * bse # Calculates Odds Ratios and corresponding intervals odds_ratios = np.exp(fixed_effects) odds_lower = np.exp(lower_ci) odds_upper = np.exp(upper_ci) elr_summary_df = pd.DataFrame( { "Study": fixed_effects, "OddsRatio": odds_ratios.values, "IC Low": odds_lower.values, "IC High": odds_upper.values, "p-value": pvalues.values, } ) else: raise ValueError("model_type must be 'linear' or 'logistic'") # Applies label mapping if provided if labels: def elr_parse_variable_name(var_name): if var_name == "Intercept" or var_name.lower() == "const": return labels.get("Intercept", "Intercept") elif "[" in var_name: base_var = var_name.split("[")[0] level = var_name.split("[")[1].split("]")[0] base_var_name = base_var.replace("C(", "").replace(")", "").strip() label = labels.get(base_var_name, base_var_name) return f"{label} ({level})" else: var_name_clean = var_name.replace("C(", "").replace(")", "").strip() return labels.get(var_name_clean, var_name_clean) elr_summary_df["Study"] = elr_summary_df["Study"].apply(elr_parse_variable_name) # Removes the intercept row if present elr_summary_df = elr_summary_df[ ~elr_summary_df["Study"].isin(["Intercept", "const"]) ] # Reorders the columns according to the model if model_type.lower() == "logistic": elr_summary_df = elr_summary_df[ ["Study", "OddsRatio", "IC Low", "IC High", "p-value"] ] else: elr_summary_df = elr_summary_df[ ["Study", "Coef", "IC Low", "IC High", "p-value"] ] # Formats the numerical values if model_type.lower() == "logistic": elr_summary_df["OddsRatio"] = elr_summary_df["OddsRatio"].round(3) else: elr_summary_df["Coef"] = elr_summary_df["Coef"].round(3) elr_summary_df["IC Low"] = elr_summary_df["IC Low"].round(3) elr_summary_df["IC High"] = elr_summary_df["IC High"].round(3) elr_summary_df["p-value"] = elr_summary_df["p-value"].apply(lambda x: f"{x:.4f}") # Renames the columns according to the reg_type parameter if reg_type.lower() == "uni": if model_type.lower() == "logistic": elr_summary_df.rename( columns={ "OddsRatio": "OddsRatio (uni)", "IC Low": "LowerCI (uni)", "IC High": "UpperCI (uni)", "p-value": "p-value (uni)", }, inplace=True, ) else: elr_summary_df.rename( columns={ "Coef": "Coef (uni)", "IC Low": "LowerCI (uni)", "IC High": "UpperCI (uni)", "p-value": "p-value (uni)", }, inplace=True, ) else: if model_type.lower() == "logistic": elr_summary_df.rename( columns={ "OddsRatio": "OddsRatio (multi)", "IC Low": "LowerCI (multi)", "IC High": "UpperCI (multi)", "p-value": "p-value (multi)", }, inplace=True, ) else: elr_summary_df.rename( columns={ "Coef": "Coef (multi)", "IC Low": "LowerCI (multi)", "IC High": "UpperCI (multi)", "p-value": "p-value (multi)", }, inplace=True, ) if print_results: print(elr_summary_df) return elr_summary_df
[docs] def execute_glm_regression( elr_dataframe_df: pandas.DataFrame, elr_outcome: str, elr_predictors: typing.Iterable, model_type: str = "linear", print_results: bool = True, labels: dict[str, str] | None = None, reg_type: str = "Multi", ) -> pandas.DataFrame: """:py:class:`pandas.DataFrame` : Executes a GLM (Generalized Linear Model) for linear or logistic regression. Parameters ---------- elr_dataframe_df : pandas.DataFrame The incoming data. elr_outcome : str Name of the response variable. elr_predictors: typing.Iterable Iterable of predictor variable names. model_type : str, default="linear" Optional indicator regression model type - use ``"linear"`` for linear regression (Gaussian) or ``"logistic"`` for logistic regression (Binomial); defaults to ``"linear"``. print_results : bool, default=True Optional indicator of whether to print the results table, defaults to ``True``. labels : dict, default=None Optional map of variable names to readable labels, defaults to ``None``.`. reg_type : str, default="multi" Optional regression type - ``"uni"`` for univariate, ``"multi"`` for multivariate. Defaults to ``"multi"``. Returns -------- pandas.DataFrame The model results. """ # noqa : E501 # Defines the family according to model_type if model_type.lower() == "logistic": family = sm.families.Binomial() elif model_type.lower() == "linear": family = sm.families.Gaussian() else: raise ValueError("model_type must be 'linear' or 'logistic'") # Builds the formula formula = elr_outcome + " ~ " + " + ".join(elr_predictors) # Converts categorical variables to 'category' type categorical_vars = elr_dataframe_df.select_dtypes( include=["object", "category"] ).columns.intersection(elr_predictors) for var in categorical_vars: elr_dataframe_df[var] = elr_dataframe_df[var].astype("category") # Fits the GLM model model = smf.glm(formula=formula, data=elr_dataframe_df, family=family) result = model.fit() # Extracts the results table summary_table = result.summary2().tables[1].copy() # For logistic regression, calculates Odds Ratios; for linear, uses the # coefficients directly. if model_type.lower() == "logistic": summary_table["Odds Ratio"] = np.exp(summary_table["Coef."]) summary_table["IC Low"] = np.exp(summary_table["[0.025"]) summary_table["IC High"] = np.exp(summary_table["0.975]"]) summary_df = summary_table[ ["Odds Ratio", "IC Low", "IC High", "P>|z|"] ].reset_index() summary_df = summary_df.rename( columns={ "index": "Study", "Odds Ratio": "OddsRatio", "IC Low": "LowerCI", "IC High": "UpperCI", "P>|z|": "p-value", } ) else: summary_df = summary_table[["Coef.", "[0.025", "0.975]", "P>|z|"]].reset_index() summary_df = summary_df.rename( columns={ "index": "Study", "Coef.": "Coefficient", "[0.025": "LowerCI", "0.975]": "UpperCI", "P>|z|": "p-value", } ) # Maps variable names to readable labels, if provided if labels: def parse_variable_name(var_name): if var_name == "Intercept": return labels.get("Intercept", "Intercept") elif "[" in var_name: base_var = var_name.split("[")[0] level = var_name.split("[")[1].split("]")[0] base_var_name = base_var.replace("C(", "").replace(")", "").strip() label = labels.get(base_var_name, base_var_name) return f"{label} ({level})" else: var_name_clean = var_name.replace("C(", "").replace(")", "").strip() return labels.get(var_name_clean, var_name_clean) summary_df["Study"] = summary_df["Study"].apply(parse_variable_name) # Reorders the columns if model_type.lower() == "logistic": summary_df = summary_df[["Study", "OddsRatio", "LowerCI", "UpperCI", "p-value"]] else: summary_df = summary_df[ ["Study", "Coefficient", "LowerCI", "UpperCI", "p-value"] ] # Removes the letter 'T.' from categorical variables summary_df["Study"] = summary_df["Study"].str.replace("T.", "") # Formats the numerical values for col in summary_df.columns[1:-1]: summary_df[col] = summary_df[col].round(3) summary_df["p-value"] = summary_df["p-value"].apply(lambda x: f"{x:.4f}") # Removes the intercept row if desired (optional) summary_df = summary_df[summary_df["Study"] != "Intercept"] # Renames the columns according to the type of regression if reg_type.lower() == "uni": if model_type.lower() == "logistic": summary_df.rename( columns={ "OddsRatio": "OddsRatio (uni)", "LowerCI": "LowerCI (uni)", "UpperCI": "UpperCI (uni)", "p-value": "p-value (uni)", }, inplace=True, ) else: summary_df.rename( columns={ "Coefficient": "Coefficient (uni)", "LowerCI": "LowerCI (uni)", "UpperCI": "UpperCI (uni)", "p-value": "p-value (uni)", }, inplace=True, ) elif reg_type.lower() == "multi": if model_type.lower() == "logistic": summary_df.rename( columns={ "OddsRatio": "OddsRatio (multi)", "LowerCI": "LowerCI (multi)", "UpperCI": "UpperCI (multi)", "p-value": "p-value (multi)", }, inplace=True, ) else: summary_df.rename( columns={ "Coefficient": "Coefficient (multi)", "LowerCI": "LowerCI (multi)", "UpperCI": "UpperCI (multi)", "p-value": "p-value (multi)", }, inplace=True, ) if print_results: print(summary_df) return summary_df
[docs] def execute_cox_model( data: pandas.DataFrame, duration_col: str, event_col: str, predictors: typing.Iterable[str], labels: dict[str, str] | None = None, ) -> pandas.DataFrame: """:py:class:`pandas.DataFrame` : Executes a Cox Proportional Hazards model without weights and returns a summary of the results. Parameters ---------- data : pandas.DataFrame The incoming data. duration_col : str Name of the time variable. event_col : str Name of the outcome variable (binary event). predictors : typing.Iterable Names of predictor variables. labels : dict, default=None Dictionary mapping variable names to readable labels, default=``None``. Returns ------- pandas.DataFrame The model results. """ # noqa : E501 # Ensure categorical variables are treated appropriately categorical_vars = data.select_dtypes( include=["object", "category"] ).columns.intersection(predictors) for var in categorical_vars: data[var] = data[var].astype("category") # Convert categorical variables to dummies data = pd.get_dummies(data, columns=categorical_vars, drop_first=True) # Ensure numerical variables have the correct type data[duration_col] = pd.to_numeric(data[duration_col], errors="coerce") data[event_col] = pd.to_numeric(data[event_col], errors="coerce") # Update predictors to include one-hot encoded columns predictors = [ c for c in data.columns if c in predictors or any(c.startswith(p + "_") for p in categorical_vars) ] # Remove rows with missing values in essential columns data = data.dropna(subset=[duration_col, event_col] + predictors) # Select relevant columns df_cox = data[[duration_col, event_col] + predictors] # Fit the Cox model cph = CoxPHFitter() cph.fit(df_cox, duration_col=duration_col, event_col=event_col) # Model summary summary = cph.summary summary["HR"] = np.exp(summary["coef"]) summary["CI_lower"] = np.exp(summary["coef"] - 1.96 * summary["se(coef)"]) summary["CI_upper"] = np.exp(summary["coef"] + 1.96 * summary["se(coef)"]) # summary['p_adj'] = summary['p'].apply( # lambda p: "<0.001" if p < 0.001 else round(p, 3)) summary["p_adj"] = summary["p"].apply(lambda p: round(p, 3)) # Select relevant columns for the final summary summary_df = summary[["HR", "p_adj", "CI_lower", "CI_upper"]].reset_index() summary_df.rename(columns={"index": "Variable", "p_adj": "p-value"}, inplace=True) # Replace variable labels if provided if labels: summary_df["Variable"] = ( summary_df["Variable"].map(labels).fillna(summary_df["Variable"]) ) return summary_df
[docs] def execute_kaplan_meier( data: pandas.DataFrame, duration_col: str, event_col: str, group_col: str, alpha=0.05, n_times=5, ) -> tuple[pandas.DataFrame, pandas.DataFrame, float]: """:py:class:`tuple` : Executes the Kaplan-Meier model and returns the results. Parameters ---------- data : pandas.DataFrame The incoming data. duration_col : str Name of the time variable. event_col : str Name of the outcome variable (binary event). group_col : str Name of the grouping column. alpha : float, default=0.05 Optional alpha, defaults to :math:`0.05`. n_times : int, default=5 Optional. No description available, defaults to :math:`5`. Returns ------- pandas.DataFrame A tuple consisting of the model results, risk table and the :math:`p`-value. """ # noqa : E501 # Remove rows with missing values in relevant columns data = data.dropna(subset=[duration_col, event_col, group_col]) kmf = KaplanMeierFitter() unique_groups = data[group_col].sort_values().unique() df_km = pd.DataFrame(columns=["timeline"]) max_time = (data[duration_col].max() // n_times) * (n_times + 1) times = np.arange(0, max_time, n_times) # Compute survival curves and confidence intervals for each group for group in unique_groups: group_data = data[data[group_col] == group] kmf.fit( group_data[duration_col], event_observed=group_data[event_col], label=str(group), alpha=alpha, ) ci_lower = kmf.confidence_interval_[f"{group}_lower_{1 - alpha}"] * 100 ci_upper = kmf.confidence_interval_[f"{group}_upper_{1 - alpha}"] * 100 survival_curve = pd.concat( [kmf.survival_function_ * 100, ci_lower, ci_upper], axis=1 ) survival_curve = ( survival_curve.drop_duplicates() .reset_index() .rename(columns={"index": "timeline"}) ) df_km = pd.merge(df_km, survival_curve, on="timeline", how="outer").bfill() # survival_curves[group] = # confidence_intervals[group] = (ci_lower, ci_upper) # Perform log-rank test if len(unique_groups) == 2: group1_data = data[data[group_col] == unique_groups[0]] group2_data = data[data[group_col] == unique_groups[1]] result = logrank_test( group1_data[duration_col], group2_data[duration_col], event_observed_A=group1_data[event_col], event_observed_B=group2_data[event_col], ) p_value = result.p_value elif len(unique_groups) > 2: result = multivariate_logrank_test( data[duration_col], data[group_col], data[event_col] ) p_value = result.p_value else: p_value = np.nan # Generate risk table: number of individuals at risk over time risk_counts = { group: [ ((data[group_col] == group) & (data[duration_col] >= t)).sum() for t in times ] for group in unique_groups } risk_table = pd.DataFrame(risk_counts, index=times).T risk_table.insert(0, "Group", risk_table.index) risk_table.reset_index(drop=True, inplace=True) return df_km, risk_table, p_value
############################################ ############################################ # FEATURE SELECTION # (includes basic imputation, variance check, correlation check) ############################################ ############################################
[docs] def impute_miss_val( data: pandas.DataFrame, dictionary: pandas.DataFrame, outcome_column: str = "outco_binary_outcome", missing_threshold: float = 0.7, verbose: bool = False, ) -> pandas.DataFrame: """:py:class:`pandas.DataFrame` : The data with missing values imputed. Imputes missing values or drops columns based on missing value proportion and median. Parameters ---------- data : pandas.DataFrame The incoming data. dictionary : pandas.DataFrame The data dictionary. outcome_column : str, default="outco_binary_outcome" Optional outcome column, defaults to ``"outco_binary_outcome"``. missing_threshold : float, default=0.7 A proportional imputation threshold for missing values, defaults to :math:`0.7`. verbose, bool=False Optional indicator of whether to print imputations summary, defaults to ``False``. Returns ------- pandas.DataFrame Data with missing values imputed or columns dropped. """ # noqa : E501 keep_columns = ["subjid", outcome_column] numeric_ind = dictionary["field_type"] == "numeric" numeric_columns = dictionary.loc[numeric_ind, "field_name"].tolist() numeric_columns = [col for col in numeric_columns if col in data.columns] binary_ind = dictionary["field_type"] == "binary" binary_columns = dictionary.loc[binary_ind, "field_name"].tolist() binary_columns = [col for col in binary_columns if col in data.columns] categorical_ind = dictionary["field_type"].isin(["binary"]) categorical_columns = dictionary.loc[categorical_ind, "field_name"].tolist() categorical_columns = [col for col in categorical_columns if col in data.columns] # Calculate the proportion of missing values in each column missing_proportions = data.isnull().mean() # Identify columns to drop columns_to_drop = missing_proportions[ (missing_proportions > missing_threshold) ].index.tolist() columns_to_drop = [col for col in columns_to_drop if col not in keep_columns] data = data.drop(columns=columns_to_drop) impute_columns = [col for col in data.columns if col not in keep_columns] # Impute missing values in remaining columns for col in impute_columns: if col in numeric_columns: data[col] = data[col].fillna(data[col].median()) if col in (binary_columns + categorical_columns): data[col] = data[col].fillna(data[col].mode().values[0]) if verbose: print("\nSummary after Imputation") print("Size of remaining data:", data.shape) return data
[docs] def rmv_low_var( data: pandas.DataFrame, dictionary: pandas.DataFrame, mad_threshold: float = 0.1, freq_threshold: float = 0.05, outcome_column: str = "outco_binary_outcome", verbose: bool = False, ) -> pandas.DataFrame: """:py:class:`pandas.DataFrame` : Removes numerical variables from the data with Median Absolute Deviation (MAD) below a given threshold. Excludes binary columns from MAD calculation. Removes binary columns with very low frequencies. Parameters ---------- data : pandas.DataFrame The incoming data. dictionary : pandas.DataFrame The data dictionary. mad_threshold : float, default=0.1 Optional MAD threshold, defaults to :math:`0.1`. freq_threshold : float, default=0.5 Optional frequency threshold, defaults to :math:`0.05`. outcome_column : str, default=``"outco_binary_outcome"`` Optional outcome column, defaults to ``"outco_binary_outcome"``. verbose : bool, default=False Optional indicator of whether to print MAD analysis summary. Returns ------- pandas.DataFrame The data with low MAD columns removed. """ # noqa : E501 keep_columns = ["subjid", outcome_column] numeric_ind = dictionary["field_type"] == "numeric" numeric_columns = dictionary.loc[numeric_ind, "field_name"].tolist() numeric_columns = [col for col in numeric_columns if col in data.columns] binary_ind = dictionary["field_type"] == "binary" binary_columns = dictionary.loc[binary_ind, "field_name"].tolist() binary_columns = [col for col in binary_columns if col in data.columns] # Remove single-valued columns first single_value_columns = data.columns[data.nunique() == 1] single_value_columns = [ col for col in single_value_columns if col not in keep_columns ] data = data.drop(columns=single_value_columns) # Calculate low frequency binary numeric column # Handle binary columns - convert to numeric first binary_counts = data[binary_columns].mean(axis=0) binary_counts = binary_counts.apply(lambda x: min((x, 1 - x))) exclude_binary_columns = [ col for col in binary_columns if (binary_counts[col] < freq_threshold) ] mad_series = ( data[numeric_columns] .apply(lambda x: x / x.abs().max(), axis=0) .apply(lambda x: np.median(np.abs(x - np.median(x)))) ) exclude_numeric_columns = [ col for col in numeric_columns if (mad_series[col] < mad_threshold) ] # Identify columns to keep: # 1. Non-numeric columns # 2. Binary numeric columns # 3. Non-binary numeric columns with MAD above threshold # Start with all CAT columns exclude_columns = exclude_binary_columns + exclude_numeric_columns keep_columns = [col for col in data.columns if col not in exclude_columns] data = data[keep_columns] if verbose: print("\nMAD Analysis Summary:") print(f"Single value columns removed: {len(single_value_columns)}") print(f"Total binary columns: {len(binary_columns)}") print(f"Total numeric columns: {len(numeric_columns)}") print(f"""High Frequency Binary columns kept: \ {len(binary_columns) - len(exclude_binary_columns)}""") print(f"""Columns removed due to low MAD: \ {len(numeric_columns) - len(exclude_numeric_columns)}""") return data
[docs] def rmv_high_corr( data: pandas.DataFrame, dictionary: pandas.DataFrame, outcome_column: str = "outco_binary_outcome", correlation_threshold: float = 0.5, verbose: bool = False, ) -> pandas.DataFrame: """:py:class:`pandas.DataFrame` : Removes variables in the data with high multicollinearity. Arbitrarily selecting one variable to remove if the correlation between two variables is above a threshold. Parameters ---------- data : pandas.DataFrame The incoming data. dictionary : pandas.DataFrame The data dictionary. outcome_column : str, default=``"outco_binary_outcome"`` Optional outcome column, defaults to ``"outco_binary_outcome"``. correlation_threshold : float, default=0.5 Optional correlation threshold, defaults to :math:`0.5`. verbose : bool, default=False Optional indicator of whether to print correlation summary. Returns ------- pandas.DataFrame The data with high correlation variables removed. """ # noqa : E501 keep_columns = ["subjid", outcome_column] numeric_ind = dictionary["field_type"] == "numeric" numeric_columns = dictionary.loc[numeric_ind, "field_name"].tolist() numeric_columns = [col for col in numeric_columns if col in data.columns] # Step 1: Select numeric columns only # numeric_cols = df.select_dtypes(include=[np.number]).columns df_numeric = data[numeric_columns] # DataFrame with only numeric columns # Step 2: Calculate the correlation matrix corr_matrix = df_numeric.corr().abs() # Step 3: Identify highly correlated columns corr_pairs = ( corr_matrix.where(np.triu(np.ones(corr_matrix.shape), 1).astype(bool)) .where(lambda x: x > correlation_threshold) .stack() .reset_index() ) # Step 4: Drop highly correlated columns (arbitrarily drop second column) exclude_columns = corr_pairs["level_1"].unique().tolist() exclude_columns = [col for col in exclude_columns if col not in keep_columns] data = data.drop(columns=exclude_columns) if verbose: print("\nCORR Summary") print(f"Columns removed due to high correlation: \ {len(exclude_columns)}") return data
[docs] def lasso_var_sel_binary( data: pandas.DataFrame, outcome_column: str = "mapped_outcome", metric: str = "balanced_accuracy", threshold: float = 1e-3, gridsearch_params: dict[str, typing.Iterable[float]] = { "l1_ratios": [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9], "Cs": [1e-3, 3.16e-3, 1e-2, 3.16e-2, 1e-1, 3.16e-1, 1, 3.16, 10], }, random_state: int = 42, verbose: bool = False, sep: str = "___", ) -> tuple[typing.Any]: """:py:class:`tuple` : Prepares data and selects features using binary logistic regression with elastic net penalty. Specifically designed for binary outcomes only. Parameters ---------- data : pandas.DataFrame The incoming data. outcome_column : str, default="mapped_outcome" Optional outcome column, defaults to ``"mapped_outcome"``. metric : str, default="balanced_accuracy" Optional metric, defaults to ``"balanced_accuracy"``. threshold : float, default=1e-3 Optional threshold, defaults to :math:`0.001`. gridsearch_params : dict, default={"l1_ratios": [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9], "Cs": [1e-3, 3.16e-3, 1e-2, 3.16e-2, 1e-1, 3.16e-1, 1, 3.16, 10]} Optional grid search params, defaults to: :: { "l1_ratios": [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9], "Cs": [1e-3, 3.16e-3, 1e-2, 3.16e-2, 1e-1, 3.16e-1, 1, 3.16, 10] } random_state : int, default=42 Optional random state, defaults to :math:`42`. verbose : bool, default=False Optional indicator of whether to print the analysis, defaults to ``False``. sep : str, default="___" Optional field-value separator, defaults to ``"___"``. Returns ------- tuple Results tuple. """ # noqa : E501 if outcome_column not in data.columns: error_str = f"Outcome column {outcome_column} not found in DataFrame" raise ValueError(error_str) # Separate predictors and outcome y = data[outcome_column].copy() X_ini = data.drop(columns=[outcome_column]) if verbose: print(f"\nInitial shape of X: {X_ini.shape}") # Encode the binary outcome label_encoder = LabelEncoder() y = label_encoder.fit_transform(y) # Verify that we have a binary outcome n_classes = len(np.unique(y)) if n_classes != 2: error_str = """This function is designed for binary classification \ only. More than two classes found.""" raise ValueError(error_str) # Standardize features scaler = StandardScaler() numeric_cols = X_ini.select_dtypes(include=[np.number]).columns if verbose: print("Column dtypes:", X_ini.dtypes) print("Numeric columns found:", len(numeric_cols)) X = X_ini.copy() if len(numeric_cols) > 0: X[numeric_cols] = scaler.fit_transform(X_ini[numeric_cols]) else: if verbose: print("No numeric columns to standardize") X = pd.DataFrame(X, columns=X_ini.columns) if verbose: outcome_classes = dict( zip(label_encoder.classes_, range(len(label_encoder.classes_))) ) print("\nOutcome classes:", outcome_classes) print(f"\nInitial shape of X: {X.shape}") # Encode categorical predictors categorical_cols = X.select_dtypes(include=["object", "category"]).columns binary_cats = [col for col in categorical_cols if X[col].nunique() == 2] multi_cats = [col for col in categorical_cols if X[col].nunique() > 2] for col in binary_cats: le = LabelEncoder() X[col] = le.fit_transform(X[col]) # Use dummies only for multi-category X = pd.get_dummies(X, columns=multi_cats, prefix_sep=sep) if verbose: print(f"\nshape of X after one-hot: {X.shape}") if X.shape[1] > 0: if verbose: print("First actual predictor column:", X.columns[0]) else: error_str = """No predictor columns left after dropping outcome (and \ ID if applicable).""" raise ValueError(error_str) # Fit binary logistic regression with elastic net # For binary classification, multi_class defaults to 'ovr', which yields a # single set of coefficients. l1_vec = [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9] C_vec = np.logspace(-3, 1, 9) logistic = LogisticRegressionCV( penalty="elasticnet", l1_ratios=l1_vec, solver="saga", cv=StratifiedKFold(n_splits=3, shuffle=True, random_state=random_state), random_state=random_state, max_iter=5000, class_weight="balanced", Cs=C_vec, tol=1e-4, scoring=metric, ) # Below is the original way we started logistic.fit(X, y) if verbose: print("Original l1_ratios specified:", l1_vec) print("Model's l1_ratios attribute:", logistic.l1_ratios) print("Shape of logistic.scores_[1]:", logistic.scores_[1].shape) print("\nFirst 10 scores for each row:") for i in [0, 1]: # explicitly loop through both rows print(f"\nRow {i} (supposed to be l1_ratio = {l1_vec[i]}):") print(logistic.scores_[1][0, :10, i]) scores_dt = np.mean(logistic.scores_[1], axis=0) if verbose: print("Mean scores for varying l1_ratio and Cs") print(scores_dt) # print matrix for this fold scores_df = pd.DataFrame( scores_dt, # Transpose to get l1_ratios as rows index=C_vec, columns=l1_vec, ) # Label the axes scores_df.index.name = "C" scores_df.columns.name = "l1_ratio" # logistic.coef_ will have shape (1, n_features) for binary classification coef_df = pd.DataFrame(logistic.coef_, columns=X.columns) # No indexing by classes since it's binary (one row of coefficients) # Compute feature importance as absolute value of coefficients # Since there's only one class row, mean across rows is just that row feature_importance = np.abs(coef_df.iloc[0, :]) # Select features with non-zero importance selected_features = feature_importance[(feature_importance > 0.0)].index.tolist() # Predictions y_pred = logistic.predict(X) # Performance metrics # Find the best C and corresponding CV score best_c = logistic.C_[0] c_index = np.where(logistic.Cs_ == best_c)[0][0] all_class_scores = [] for cl in logistic.scores_: all_class_scores.extend(logistic.scores_[cl][:, c_index]) best_cv_score = np.mean(all_class_scores) conf_matrix = confusion_matrix(y, y_pred) target_names = [str(c) for c in label_encoder.classes_] X_selected = X[selected_features] if verbose: print("\nPerformance Metrics:") print("-------------------") print(f"Best C value: {best_c}") print("\nConfusion Matrix:") print(conf_matrix) print("\nClassification Report:") print(classification_report(y, y_pred, target_names=target_names)) print(f"Best CV score: {best_cv_score}") # l1_ratio_ returns the best ratio found for each class. For binary, # there should be one: print(f"Best l1_ratio: {logistic.l1_ratio_[0]}") print(f"\nSelected {len(selected_features)} features") # Print feature importance for selected features print("\nFeature importance for selected features:") sorted_features = sorted( selected_features, key=lambda x: feature_importance[x], reverse=True ) for feat in sorted_features: print(f"{feat}: {feature_importance[feat]:.4f}") print(f"Final shape of selected features: {X_selected.shape}") # Store metrics in a dictionary report = classification_report( y, y_pred, target_names=label_encoder.classes_, output_dict=True ) metrics = { "confusion_matrix": conf_matrix, "classification_report": report, "accuracy": accuracy_score(y, y_pred), "cv_scores": all_class_scores, } original_features = {} for feature in selected_features: if sep in feature: orig_name = feature.split(sep)[0] coef = abs(feature_importance[feature]) original_features[orig_name] = max( original_features.get(orig_name, 0), coef ) else: original_features[feature] = abs(feature_importance[feature]) # Create grouped results showing all categories grp_results = create_grouped_results(selected_features, feature_importance) results_df = grp_results[0] sorted_fields = grp_results[1] categorical_fields = grp_results[2] if verbose: # Print the grouped results print("\nSelected features grouped by main field:") print(results_df) # # Save to CSV # results_df.to_csv('feature_coefficients_grouped.csv', index=False) # Note: We may still want to keep the original X_selected DataFrame for # further analysis X_selected = X[selected_features] # print(f"Final shape of selected features: {X_selected.shape}") # After running create_grouped_results, extract main fields main_fields = [] for field in sorted_fields: # We already have sorted_fields from earlier if field in categorical_fields: main_fields.append(field) else: main_fields.append(field) # For regular features # Convert main_fields list to a single-column DataFrame main_fields_df = pd.DataFrame({"Main Features": main_fields}) top_params = get_parameter_ranking(logistic, n_top=20, threshold=threshold) if verbose: print(main_fields_df) print("\nTop parameter combinations:") print(top_params) return ( results_df, scores_df, main_fields_df, top_params, X_selected, y, selected_features, coef_df, label_encoder, feature_importance, metrics, )
[docs] def create_grouped_results( selected_features: typing.Iterable[str], feature_importance: dict[str, float], sep: str = "___", ) -> tuple[pd.DataFrame, typing.Iterable[str], typing.Iterable[str]]: """:py:class:`tuple` : Creates and returns grouped feature results. The main dataFrame lists categories under their main fields, with main fields sorted by their maximum coefficient magnitude. Parameters ---------- selected_features : typing.Iterable An iterable of selected feature names. feature_importance : dict A dict of feature names and coefficient weights. sep : str, default="___" Optional field-value separator, defaults to ``"___"``. Returns ---------- tuple Feature results. """ # noqa : E501 # Step 1: Identify one-hot encoded and regular features categorical_fields = set() regular_features = [] for feature in selected_features: if sep in feature: # One-hot encoded feature categorical_fields.add(feature.split(sep)[0]) else: # Regular feature regular_features.append(feature) # Step 2: Organize features by main field field_groups = {} # Process categorical fields for field in categorical_fields: field_groups[field] = [] # Find all categories for this field for feature in selected_features: if feature.startswith(field + sep): category = feature.split(sep)[1] coef = feature_importance[feature] field_groups[field].append( {"Feature": category, "Coefficient": coef, "AbsCoef": abs(coef)} ) # Process regular features for feature in regular_features: coef = feature_importance[feature] # For regular features, store coefficient directly field_groups[feature] = [ {"Feature": "", "Coefficient": coef, "AbsCoef": abs(coef)} ] # Get max coefficient for each field for sorting field_max_coef = {} for field, categories in field_groups.items(): field_max_coef[field] = max(cat["AbsCoef"] for cat in categories) # Sort fields by their max coefficient sorted_fields = sorted( field_groups.keys(), key=lambda f: field_max_coef[f], reverse=True ) # Create the final DataFrame with the desired structure results = [] for field in sorted_fields: if field in categorical_fields: # For categorical fields, add header row with no coefficient results.append({"Feature": field, "Coefficient": "..."}) # Add all categories categories = sorted( field_groups[field], key=lambda x: x["AbsCoef"], reverse=True ) for cat in categories: results.append( { # Indent for visual grouping "Feature": " " + cat["Feature"], "Coefficient": cat["Coefficient"], } ) else: # For regular features, show coefficient directly results.append( {"Feature": field, "Coefficient": field_groups[field][0]["Coefficient"]} ) return pd.DataFrame(results), sorted_fields, categorical_fields
[docs] def get_parameter_ranking( logistic: typing.Any, n_top: int = 10, threshold: float = 1e-3 ) -> pandas.DataFrame: """:py:class:pd.DataFrame : Returns a dataframe of rankings of parameter combinations using stored scores and coefficient paths. Parameters ---------- logistic : typing.Any The logistic model. n_top : int, default=10 Optional number of top ranking features to select, defaults to :math:`10`. threshold : float, default=1e-3 Optional ranking threshold, defaults to :math:`0.001`. Returns ------- pandas.DataFrame The dataframe of parameter rankings. """ # noqa : E501 # Create empty list to store parameter information param_scores = [] # Get model parameters l1_ratios = logistic.l1_ratios_ Cs = logistic.Cs_ # Get first class key (for binary classification, # there's only one set of scores) first_class = list(logistic.scores_.keys())[0] # Loop through all parameter combinations for l1_ratio_idx, l1_ratio in enumerate(l1_ratios): for C_idx, C in enumerate(Cs): # Get mean score across folds for this parameter combination score = logistic.scores_[first_class][:, C_idx, l1_ratio_idx].mean() # Get coefficients for this parameter combination coef0 = logistic.coefs_paths_[first_class][0, C_idx, l1_ratio_idx, :] coef1 = logistic.coefs_paths_[first_class][1, C_idx, l1_ratio_idx, :] coef2 = logistic.coefs_paths_[first_class][2, C_idx, l1_ratio_idx, :] # Count non-zero coefficients n_features0 = np.sum(np.abs(coef0) > threshold) n_features1 = np.sum(np.abs(coef1) > threshold) n_features2 = np.sum(np.abs(coef2) > threshold) # Append to our list param_scores.append( { "l1_ratio": l1_ratio, "C": C, "score": score, "n_features0": n_features0, "n_features1": n_features1, "n_features2": n_features2, } ) # Convert to DataFrame and sort params_df = pd.DataFrame(param_scores) params_df = ( params_df.sort_values("score", ascending=False) .drop_duplicates("n_features0") .head(n_top) ) return params_df