from __future__ import annotations
__all__ = [
"convert_categorical_to_onehot",
"convert_onehot_to_categorical",
"create_grouped_results",
"descriptive_comparison_table",
"descriptive_table",
"execute_cox_model",
"execute_glm_regression",
"execute_glmm_regression",
"execute_kaplan_meier",
"extend_dictionary",
"format_descriptive_table_variables",
"format_pvalue",
"format_variables",
"from_timeA_to_timeB",
"get_chi2_pvalue",
"get_counts",
"get_descriptive_data",
"get_fisher_exact_pvalue",
"get_mean_and_stdev",
"get_median_interquartile_range",
"get_modelling_data",
"get_n_percent_value",
"get_parameter_ranking",
"get_proportions",
"get_pyramid_data",
"get_upset_counts_intersections",
"get_variables_by_section_and_type",
"impute_miss_val",
"lasso_var_sel_binary",
"mannwhitneyu",
"mean_std_str",
"median_iqr_str",
"n_percent_str",
"norm",
"regression_summary_table",
"remove_single_binary_outcome_predictors",
"rmv_high_corr",
"rmv_low_var",
"trim_field_label",
"variance_influence_factor_backwards_elimination",
]
# -- IMPORTS --
# -- Standard libraries --
import typing
import warnings
# -- 3rd party libraries --
import numpy as np
import pandas
import statsmodels.api as sm
import statsmodels.formula.api as smf
from lifelines import CoxPHFitter, KaplanMeierFitter
from lifelines.statistics import logrank_test, multivariate_logrank_test
from scipy.stats import chi2_contingency, fisher_exact, mannwhitneyu, norm
from sklearn.linear_model import LogisticRegressionCV
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
from sklearn.model_selection import StratifiedKFold
from sklearn.preprocessing import LabelEncoder, StandardScaler
from statsmodels.genmod.bayes_mixed_glm import BinomialBayesMixedGLM
from statsmodels.stats.outliers_influence import variance_inflation_factor
# -- Internal libraries --
pd = pandas # An alias to allow Pandas code refs to work independently
# of Pandas Intersphinx refs in type hinting and docstrings
############################################
############################################
# General preprocessing
############################################
############################################
[docs]
def extend_dictionary(
dictionary: pandas.DataFrame,
new_variable_dict: dict[str, typing.Any],
data: pandas.DataFrame,
sep: str = "___",
) -> pandas.DataFrame:
""":py:class:`pandas.DataFrame` : Returns the VERTEX dictionary with new custom variables added.
Parameters
----------
dictionary : pandas.DataFrame
VERTEX dictionary containing columns ``"field_name"``, ``"form_name"``,
``"field_type"``, ``"field_label"``, ``"parent"``, ``"branching_logic"``.
new_variable_dict : dict
A dict with the same keys as the dictionary columns, the values for
each item can be a string or a list.
data : pandas.DataFrame
Pandas dataframe containing the data for the project. The columns of
this dataframe must include the variables in
``new_variable_dict["field_type"]``.
sep : str
Separator for creating new one-hot-encoded variable names.
Returns
-------
pandas.DataFrame
VERTEX dictionary containing the original variables, plus the new
variables and any one-hot-encoded variables derived from this.
""" # noqa : E501
# Convert dict values to list if all are strings (otherwise a pandas error)
if all(isinstance(v, str) for v in new_variable_dict.values()):
new_variable_dict = {k: [v] for k, v in new_variable_dict.items()}
new_dictionary = pd.DataFrame.from_dict(new_variable_dict)
new_dictionary["index"] = np.nan
dictionary = dictionary.reset_index(drop=False)
for ind in new_dictionary.index:
parent = new_dictionary.loc[ind, "parent"]
if parent not in new_dictionary["field_name"].values:
if parent not in dictionary["field_name"].values:
new_ind = dictionary["index"].max() + 0.1
else:
new_ind = dictionary.loc[
(dictionary["field_name"] == parent), "index"
].max()
parent = dictionary.loc[new_ind, "field_name"]
while parent in dictionary["parent"].values:
new_ind = dictionary.loc[
(dictionary["parent"] == parent), "index"
].max()
parent = dictionary.loc[new_ind, "field_name"]
new_ind = new_ind + 0.1
else:
new_ind = (
new_dictionary.loc[
(new_dictionary["field_name"] == parent), "index"
].max()
+ 0.1
)
# new_ind = new_ind + 0.1
new_dictionary.loc[ind, "index"] = new_ind
categorical_ind = new_dictionary["field_type"].isin(["categorical"])
new_dictionary_list = [new_dictionary]
ind_list = [
ind
for ind in categorical_ind.loc[categorical_ind].index
if new_dictionary.loc[ind, "field_name"] in data.columns
]
for ind in ind_list:
variable = new_dictionary.loc[ind, "field_name"]
options = data[variable].drop_duplicates().sort_values().dropna()
options = [y for y in options if y not in (True, False)]
options = [
y
for y in options
if (variable + sep + str(y)) not in new_dictionary["field_name"].values
]
add_options = pd.DataFrame(
columns=dictionary.columns, index=range(len(options))
)
add_options["field_name"] = [variable + sep + str(y) for y in options]
add_options["form_name"] = new_dictionary.loc[ind, "form_name"]
add_options["branching_logic"] = new_dictionary.loc[ind, "branching_logic"]
add_options["field_type"] = "binary"
add_options["field_label"] = options
add_options["parent"] = variable
add_options["index"] = np.linspace(
new_dictionary.loc[ind, "index"] + 0.2,
np.ceil(new_dictionary.loc[ind, "index"]) - 0.1,
len(options),
)
new_dictionary_list += [add_options]
dictionary = pd.concat([dictionary] + new_dictionary_list, axis=0)
dictionary = dictionary.sort_values(by="index").drop(columns="index")
dictionary = dictionary.reset_index(drop=True)
return dictionary
[docs]
def get_variables_by_section_and_type(
data: pandas.DataFrame,
dictionary: pandas.DataFrame,
required_variables: typing.Iterable[str] | None = None,
include_sections: typing.Iterable[str] = ["demog"],
include_types: typing.Iterable[str] = ["binary", "categorical", "numeric"],
exclude_suffixes: typing.Iterable[str] = [
"_units",
"addi",
"otherl2",
"item",
"_oth",
"_unlisted",
"otherl3",
],
include_subjid: bool = False,
) -> list[str]:
""":py:class:`list` : Returns a list of all variables in the dataframe from specified sections and types, plus any required variables.
Parameters
----------
data : pandas.DataFrame
Incoming data.
dictionary : pandas.DataFrame
Data dictionary.
required_variables : typing.Iterable, default=None
Optional iterable of required variable names, defaults to ``None``.
include_sections : typing.Iterable, default=["demog"]
Optional iterable of names of sections to include, defaults to
``["demog"]``.
include_types : typing.Iterable, default=["binary", "categorical", "numeric"]
Optional iterable of variable type names, defaults to
``["binary", "categorical", "numeric"]``.
exclude_suffixes : typing.Iterable, default=``["_units", "addi", "otherl2", "item", "_oth", "_unlisted", "otherl3"]``
Optional iterable of suffixes to exclude, defaults to
``["_units", "addi", "otherl2", "item", "_oth", "_unlisted", "otherl3"]``.
include_subjid : bool, default=False
Optional boolean to indicate whether to include subject ID, defaults
to ``False``.
Returns
-------
list
A list of all variables in the dataframe from specified sections and
types, plus any required variables.
""" # noqa : E501
if "section" in dictionary.columns: # TODO: this should always be True in future
include_ind = dictionary["section"].isin(include_sections)
else:
include_ind = dictionary["field_name"].apply(
lambda x: x.startswith(tuple(x + "_" for x in include_sections))
)
include_ind &= dictionary["field_type"].isin(include_types)
# include_ind &= (dictionary['field_name'].apply(
# lambda x: x.endswith(tuple('___' + x for x in exclude_suffix))) == 0)
include_ind &= (
dictionary["field_name"].apply(
lambda x: x.endswith(tuple(x for x in exclude_suffixes))
)
== 0
)
if isinstance(required_variables, list):
include_ind |= dictionary["field_name"].isin(required_variables)
if include_subjid:
include_ind |= dictionary["field_name"] == "subjid"
include_variables = dictionary.loc[include_ind, "field_name"].tolist()
return [col for col in include_variables if col in data.columns]
[docs]
def convert_categorical_to_onehot(
data: pandas.DataFrame,
dictionary: pandas.DataFrame,
categorical_columns: typing.Iterable[str],
sep: str = "___",
missing_val: str = "nan",
drop_first: bool = False,
) -> pandas.DataFrame:
""":py:class:`pandas.DataFrame` : Returns the given dataframe with categorical variable columns converted to onehot-encoded variable columns.
Parameters
----------
data : pandas.DataFrame
The incoming data.
dictionary : pandas.DataFrame
The data dictionary.
categorical_columns : typing.Iterable
An iterable of categorical column names.
sep : str, default="___"
Field-value separator, defaults to ``"___"``
missing_val : str, default="nan"
Optional value with which to replace missing values, defaults to
``"nan"``.
drop_first : bool, default=False
Optional boolean to indicate how to drop categorical columns [?],
defaults to ``False``.
Returns
-------
pandas.DataFrame
The original dataframe with the categorical -> one-hot-encoded variable
columns.
""" # noqa : E501
categorical_columns = [col for col in data.columns if col in categorical_columns]
data.loc[:, categorical_columns] = data[categorical_columns].fillna(missing_val)
data = pd.get_dummies(data, columns=categorical_columns, prefix_sep=sep)
for categorical_column in categorical_columns:
onehot_columns = [
var for var in data.columns if var.split(sep)[0] == categorical_column
]
data[onehot_columns] = data[onehot_columns].astype(object)
if (categorical_column + sep + missing_val) in data.columns:
mask = data[categorical_column + sep + missing_val] == 1
data.loc[mask, onehot_columns] = np.nan
data = data.drop(columns=[categorical_column + sep + missing_val])
elif (
drop_first
and isinstance(dictionary, pd.DataFrame)
and "field_name" in dictionary.columns
):
drop_column_ind = dictionary.apply(
lambda x: (
(x["parent"] == categorical_column)
& (x["field_name"].split(sep)[0] == categorical_column)
),
axis=1,
)
if drop_column_ind.any():
data = data.drop(
columns=[dictionary.loc[drop_column_ind, "field_name"].values[0]]
)
if isinstance(dictionary, pd.DataFrame) and "field_name" in dictionary.columns:
columns = [
col for col in dictionary["field_name"].values if col in data.columns
]
columns += [
col for col in data.columns if col not in dictionary["field_name"].values
]
data = data[columns]
return data
[docs]
def convert_onehot_to_categorical(
data: pandas.DataFrame,
dictionary: pandas.DataFrame,
categorical_columns: typing.Iterable[str],
sep: str = "___",
missing_val: str = "nan",
) -> pandas.DataFrame:
""":py:class:`pandas.DataFrame` : Returns the given dataframe with onehot-encoded variable columns to categorical variable columns.
Parameters
----------
data : pandas.DataFrame
The incoming data.
dictionary : pandas.DataFrame
Data dictionary.
categorical_columns : typing.Iterable
An iterable of categorical column names.
sep : str, default="___"
Optional field-value separator, defaults to ``"___"``.
missing_val : str, default="nan"
Optional value with which to replace missing values, defaults to
``"nan"``.
Returns
-------
pandas.DataFrame
The original dataframe with the one-hot-encoded -> categorical variable
columns.
""" # noqa : E501
data = pd.concat([data, pd.DataFrame(columns=categorical_columns)], axis=1)
for categorical_column in categorical_columns:
onehot_columns = list(
data.columns[data.columns.str.startswith(categorical_column + sep)]
)
# Preserve missingness
data.loc[:, categorical_column + sep + missing_val] = (
data[onehot_columns].any(axis=1) == 0
) | (data[onehot_columns].isna().any(axis=1))
with pd.option_context("future.no_silent_downcasting", True):
data.loc[:, onehot_columns] = data[onehot_columns].fillna(False)
onehot_columns += [categorical_column + sep + missing_val]
data.loc[:, categorical_column] = pd.from_dummies(data[onehot_columns], sep=sep)
data = data.drop(columns=onehot_columns)
columns = [col for col in dictionary["field_name"].values if col in data.columns]
columns += [
col for col in data.columns if col not in dictionary["field_name"].values
]
data = data[columns]
return data
[docs]
def from_timeA_to_timeB(
data: pandas.DataFrame,
dictionary: pandas.DataFrame,
timeA_column: str,
timeB_column: str,
timediff_column: str,
timediff_label: str,
time_unit: str = "days",
) -> tuple[pd.DataFrame]:
""":py:class:`tuple` : Returns the data and data dictionary updated with time difference calculation between two given data columns.
Parameters
----------
data : pandas.DataFrame
The incoming data.
dictionary : pandas.DataFrame
The data dictionary.
timeA_column : str
The name of the first time column.
timeB_column : str
The name of the second time column.
timediff_column : str
The name of the column for storing the time difference of the two
given time columns.
timediff_label : str
A label to attach to the time difference column.
time_unit : str, default="days"
An optional indicator of which time unit to use for the time difference
calculation, defaults to ``"days"``.
Returns
-------
tuple
The data and data dictionary updated with time difference information.
""" # noqa : E501
if time_unit == "days":
data[timediff_column] = (data[timeB_column] - data[timeA_column]).dt.days
elif time_unit == "months":
data[timediff_column] = (
data[timeB_column].dt.year - data[timeA_column].dt.year
) * 12 + (data[timeB_column].dt.month - data[timeA_column].dt.month)
elif time_unit == "months":
data[timediff_column] = data[timeB_column].dt.year - data[timeA_column].dt.year
time_dict = {
"field_name": timediff_column,
"form_name": dictionary.loc[
(dictionary["field_name"] == timeB_column), "field_name"
].values[0],
"field_type": "numeric",
"field_label": timediff_label,
"parent": dictionary.loc[
(dictionary["field_name"] == timeB_column), "parent"
].values[0],
}
dictionary = extend_dictionary(dictionary, time_dict, data)
return data, dictionary
############################################
############################################
# Descriptive table
############################################
############################################
[docs]
def get_mean_and_stdev(
series: pandas.Series,
add_spaces: bool = False,
dp: int = 1,
mfw: int = 4,
min_n: int = 3,
) -> str:
""":py:class:`str` : Returns the mean and standard deviation of a series of values as a formatted string.
Parameters
----------
series : pandas.Series
The input series for which to calculate the mean and st. dev.
add_spaces : bool, default=False
Add spacing in the string.
dp : int, default=1
No description available.
mfw : int, default=4
No description available.
min_n : int, default=3
No description available.
Returns
-------
str
The mean and st. dev. as a formatted string.
""" # noqa : E501
if series.notna().sum() < min_n:
output_str = "N/A"
elif add_spaces:
mfw_f = int(max((np.log10(series.mean(), 1)))) + 2 + dp
output_str = "%*.*f" % (mfw_f, dp, series.mean()) + " ("
output_str += "%*.*f" % (mfw_f, dp, series.std()) + ") | "
output_str += "%*g" % (mfw, int(series.notna().sum()))
else:
output_str = "%.*f" % (dp, series.mean()) + " ("
output_str += "%.*f" % (dp, series.std()) + ") | "
output_str += str(int(series.notna().sum()))
return output_str
[docs]
def mean_std_str(
series: pandas.Series,
add_spaces: bool = False,
dp: int = 1,
mfw: int = 4,
min_n: int = 3,
) -> str:
""":py:class:`str` : Returns the mean and standard deviation of a series of values as a formatted string.
.. warning::
DEPRECATED. Please use :py:func:`~isaricanalytics.analytics.get_mean_and_stdev` instead.
Parameters
----------
series : pandas.Series
The input series for which to calculate the mean and st. dev.
add_spaces : bool, default=False
Add spacing in the string.
dp : int, default=1
No description available.
mfw : int, default=4
No description available.
min_n : int, default=3
No description available.
Returns
-------
str
The mean and st. dev. as a formatted string.
""" # noqa : E501
warnings.warn(
(
"`analytics.mean_std_str` is deprecated; "
"please use `analytics.get_mean_and_stdev` instead."
),
DeprecationWarning,
stacklevel=2,
)
return get_mean_and_stdev(
series, add_spaces=add_spaces, dp=dp, mfw=mfw, min_n=min_n
)
[docs]
def get_n_percent_value(
series: pandas.Series,
add_spaces: bool = False,
dp: int = 1,
mfw: int = 4,
min_n: int = 1,
) -> str:
""":py:class:`str` : Returns the n-percent value of a series as a string.
Parameters
----------
series : pandas.Series
The input series.
add_spaces : bool, default=False
Add spacing around the string.
dp : int, default=1
No description available.
mfw : int, default=1
No description available.
min_n : int, default=1
No description available.
Returns
-------
str
The n-percent value of the series as a string.
""" # noqa : E501
if series.notna().sum() < min_n:
output_str = "N/A"
elif add_spaces:
output_str = "%*g" % (mfw, int(series.sum())) + " ("
percent = 100 * series.mean()
if percent == 100:
output_str += "100.0) | "
else:
output_str += "%5.*f" % (dp, percent) + ") | "
output_str += "%*g" % (mfw, int(series.notna().sum()))
else:
count = str(int(series.sum()))
percent = "%.*f" % (dp, 100 * series.mean())
denom = str(int(series.notna().sum()))
output_str = f"{count} ({percent}) | {denom}"
return output_str
[docs]
def n_percent_str(
series: pandas.Series,
add_spaces: bool = False,
dp: int = 1,
mfw: int = 4,
min_n: int = 1,
) -> str:
""":py:class:`str` : Returns the n-percent value of a series as a string.
.. warning::
DEPRECATED. Please use :py:func:`~isaricanalytics.analytics.get_n_percent_value` instead.
Parameters
----------
series : pandas.Series
The input series.
add_spaces : bool, default=False
Add spacing around the string.
dp : int, default=1
No description available.
mfw : int, default=1
No description available.
min_n : int, default=1
No description available.
Returns
-------
str
The n-percent value of the series as a string.
""" # noqa : E501
warnings.warn(
(
"`analytics.n_percent_str` is deprecated; "
"please use `analytics.get_n_percent_value` instead."
),
DeprecationWarning,
stacklevel=2,
)
return get_n_percent_value(
series, add_spaces=add_spaces, dp=dp, mfw=mfw, min_n=min_n
)
[docs]
def get_chi2_pvalue(
x: pandas.Series,
y: pandas.Series,
x_cat: typing.Iterable[typing.Any] = [True, False],
y_cat: typing.Iterable[typing.Any] = [True, False],
) -> float:
""":py:class:`float` : Returns the :math`p`-value for a Chi-squared test.
Parameters
----------
x : pandas.Series
The first series/factor.
y : pandas.Series
The second series/factor.
x_cat : typing.Iterable
An iterable of categories by which to group the first series.
y_cat : typing.Iterable
An interable of categories by which to group the second series.
Returns
-------
float
The :math:`p`-value for the test.
""" # noqa : E501
try:
print(x.name)
contingency = pd.crosstab(
pd.Categorical(x.loc[x.notna()], categories=x_cat),
pd.Categorical(y.loc[x.notna()], categories=y_cat),
margins=False,
dropna=False,
)
pvalue = chi2_contingency(contingency).pvalue
except ValueError:
pvalue = np.nan
return pvalue
[docs]
def get_fisher_exact_pvalue(
x: pandas.Series,
y: pandas.Series,
x_cat: typing.Iterable[typing.Any] = [True, False],
y_cat: typing.Iterable[typing.Any] = [True, False],
):
""":py:class:`float` : Returns the :math:`p`-value for a Fisher exact test.
Parameters
----------
x : pandas.Series
The first series/factor.
y : pandas.Series
The second series/factor.
x_cat : typing.Iterable
An iterable of categories by which to group the first series.
y_cat : typing.Iterable
An interable of categories by which to group the second series.
Returns
-------
float
The :math:`p`-value for the test.
""" # noqa : E501
try:
contingency = pd.crosstab(
pd.Categorical(x.loc[x.notna()], categories=x_cat),
pd.Categorical(y.loc[x.notna()], categories=y_cat),
margins=False,
dropna=False,
)
result = fisher_exact(contingency)
if np.isinf(result.statistic) or np.isnan(result.statistic):
pvalue = np.nan
else:
pvalue = result.pvalue
except ValueError:
pvalue = np.nan
return pvalue
[docs]
def get_descriptive_data(
data: pandas.DataFrame,
dictionary: pandas.DataFrame,
by_column: str | None = None,
include_sections: typing.Iterable[str] = ["demog"],
include_types: typing.Iterable[str] = ["binary", "categorical", "numeric"],
exclude_suffix: typing.Iterable[str] = [
"_units",
"addi",
"otherl2",
"item",
"_oth",
"_unlisted",
"otherl3",
],
include_subjid: bool = False,
exclude_negatives: bool = True,
sep: str = "___",
) -> pandas.DataFrame:
""":py:class:`pandas.DataFrame` : Returns descriptive data.
Parameters
----------
data : pandas.DataFrame
Incoming data.
dictionary : pandas.DataFrame
Data dictionary.
by_column : str, default=None
Optional. No description available, defaults to ``None``.
include_sections : typing.Iterable, default=["demog"]
Optional list of names of sections to include, defaults to
``["demog"]``.
include_types : typing.Iterable, default=["binary", "categorical", "numeric"]
Optional iterable of variable type names, defaults to
``["binary", "categorical", "numeric"]``.
exclude_suffix : typing.Iterable, default=["_units", "addi", "otherl2", "item", "_oth", "_unlisted", "otherl3"]
Optional iterable of suffixes to exclude, defaults to
``["_units", "addi", "otherl2", "item", "_oth", "_unlisted", "otherl3"]``.
include_subjid : bool, default=False
Optional boolean to indicate whether to include subject ID, defaults to
``False``.
exclude_negatives : bool, default=True
Optional boolean to indicate whether to drop negatives, defaults to
``True``.
sep : str, default="___"
Optional field-value separator, defaults to ``"___"``.
Returns
-------
pandas.DataFrame
Returns the descriptive data.
""" # noqa : E501
_data = data.copy()
include_columns = get_variables_by_section_and_type(
_data,
dictionary,
include_types=include_types,
include_subjid=include_subjid,
include_sections=include_sections,
exclude_suffix=exclude_suffix,
)
if (by_column is not None) & (by_column not in include_columns):
include_columns = [by_column] + include_columns
_data = _data[include_columns].dropna(axis=1, how="all").copy()
# Convert categorical variables to onehot-encoded binary columns
categorical_ind = dictionary["field_type"] == "categorical"
columns = dictionary.loc[categorical_ind, "field_name"].tolist()
columns = [col for col in columns if col != by_column]
_data = convert_categorical_to_onehot(
_data, dictionary, categorical_columns=columns, sep=sep
)
if (by_column is not None) & (by_column not in _data.columns):
_data = convert_onehot_to_categorical(
_data, dictionary, categorical_columns=[by_column], sep=sep
)
negative_values = ("no", "never smoked")
negative_columns = [
col for col in _data.columns if col.split(sep)[-1].lower() in negative_values
]
if exclude_negatives:
_data.drop(columns=negative_columns, inplace=True)
# Remove columns with only NaN values
_data = _data.dropna(axis=1, how="all")
_data.fillna({by_column: "Unknown"}, inplace=True)
return _data
[docs]
def descriptive_table(
data: pandas.DataFrame,
dictionary: pandas.DataFrame,
by_column: str | None = None,
include_totals: bool = True,
column_reorder: typing.Iterable[str] | None = None,
include_raw_variable_name: bool = False,
sep: str = "___",
) -> tuple[pandas.DataFrame, str]:
""":py:class:`tuple` : Returns the descriptive table and table key for binary and numerical variables in the data.
The descriptive table will have separate columns for each category that
exists for the ``by_column`` variable, if this is provided.
Parameters
----------
data : pandas.DataFrame
Incoming data.
dictionary : pandas.DataFrame
Data dictionary.
by_column : str, default=None
Optional. No description available, defaults to ``None``.
include_totals : bool, default=None
Optional. No description available, defaults to ``None``.
column_reorder : typing.Iterable, default=None
Optional iterable of names of columns to reorder by, defaults to
``None``.
include_raw_variable_name : bool, default=False
Optional boolean indicating whether to include the raw variable name,
defaults to ``False``.
sep : str, default="___"
Optional field-value separator, defaults to ``"___"``.
Returns
-------
tuple
Returns the descriptive table and table key for binary and numerical
variables in the data.
""" # noqa : E501
_data = data.copy()
numeric_ind = dictionary["field_type"] == "numeric"
numeric_columns = dictionary.loc[numeric_ind, "field_name"].tolist()
numeric_columns = [col for col in numeric_columns if col in _data.columns]
binary_ind = dictionary["field_type"] == "binary"
binary_columns = dictionary.loc[binary_ind, "field_name"].tolist()
binary_columns = [col for col in binary_columns if col in _data.columns]
# Add columns for section headers and categorical questions
index = numeric_columns + binary_columns
index += dictionary.loc[(dictionary["field_name"].isin(index)), "parent"].tolist()
table_dictionary = dictionary.loc[(dictionary["field_name"].isin(index))]
index = table_dictionary["field_name"].tolist()
table_columns = ["Variable", "All"]
if by_column is not None:
add_columns = list(_data[by_column].unique())
if column_reorder is not None:
table_columns += [col for col in column_reorder if col in add_columns]
table_columns += [col for col in add_columns if col not in column_reorder]
else:
table_columns += add_columns
table_columns += ["Raw variable name"]
table = pd.DataFrame("", index=index, columns=table_columns)
table["Raw variable name"] = [var if var in _data.columns else "" for var in index]
table["Variable"] = format_descriptive_table_variables(
table_dictionary, sep=sep
).tolist()
mfw = int(np.log10(_data.shape[0])) + 1 # Min field width, for formatting
table.loc[numeric_columns, "All"] = _data[numeric_columns].apply(
median_iqr_str, mfw=mfw
)
table.loc[binary_columns, "All"] = _data[binary_columns].apply(
n_percent_str, mfw=mfw
)
totals = pd.DataFrame(columns=table_columns, index=["totals"])
totals["Variable"] = "<b>Totals</b>"
totals["All"] = _data.shape[0]
if by_column is not None:
choices_values = _data[by_column].unique()
for value in choices_values:
ind = _data[by_column] == value
mfw = int(np.log10(ind.sum())) + 1 # Min field width, for format
table.loc[numeric_columns, value] = _data.loc[ind, numeric_columns].apply(
median_iqr_str, mfw=mfw
)
table.loc[binary_columns, value] = _data.loc[ind, binary_columns].apply(
n_percent_str, mfw=mfw
)
totals[value] = ind.sum()
table = table.reset_index(drop=True)
if include_totals:
table = pd.concat([totals, table], axis=0).reset_index(drop=True)
table_key = "<b>KEY</b><br>(*) Count (%) | N<br>(+) Median (IQR) | N"
if include_raw_variable_name is False:
table.drop(columns=["Raw variable name"], inplace=True)
return table, table_key
[docs]
def descriptive_comparison_table(
data: pandas.DataFrame,
dictionary: pandas.DataFrame,
by_column: str | None = None,
include_totals: bool = True,
column_reorder: typing.Iterable[str] | None = None,
sep: str = "___",
pvalue_significance: dict[str, float] = {"*": 0.05, "**": 0.01},
) -> tuple[pd.DataFrame, str]:
""":py:class:`tuple` : Returns the descriptive comparison table and table key for binary (including onehot-encoded categorical) and numerical variables in data.
The descriptive table will have separate columns for each category that
exists for the ``by_column`` variable, if this is provided.
Parameters
----------
data : pandas.DataFrame
Incoming data.
dictionary : pandas.DataFrame
Data dictionary.
by_column : str, default=None
Optional. No description available, defaults to ``None``.
include_totals : bool, default=None
Optional. No description available, defaults to ``None``.
column_reorder : typing.Iterable, default=None
Optional iterable of names of columns to reorder by, defaults to
``None``.
sep : str, default="___"
Optional field-value separator, defaults to ``"___"``.
pvalue_significance : dict, default={"*": 0.05, "**": 0.01}
A dict of significance levels, defaults to ``{"*": 0.05, "**": 0.01}``.
Returns
-------
tuple
Returns the descriptive comparison table and table key for binary and
numerical variables in the data.
""" # noqa : E501
_data = data.copy()
numeric_ind = dictionary["field_type"] == "numeric"
numeric_columns = dictionary.loc[numeric_ind, "field_name"].tolist()
numeric_columns = [col for col in numeric_columns if col in _data.columns]
binary_ind = dictionary["field_type"] == "binary"
binary_columns = dictionary.loc[binary_ind, "field_name"].tolist()
binary_columns = [col for col in binary_columns if col in _data.columns]
# Add columns for section headers and categorical questions
index = numeric_columns + binary_columns
index += dictionary.loc[(dictionary["field_name"].isin(index)), "parent"].tolist()
table_dictionary = dictionary.loc[(dictionary["field_name"].isin(index))]
index = table_dictionary["field_name"].tolist()
table_columns = ["Variable", "All"]
if by_column is not None:
add_columns = list(_data[by_column].unique())
if column_reorder is not None:
table_columns += [col for col in column_reorder if col in add_columns]
table_columns += [col for col in add_columns if col not in column_reorder]
else:
table_columns += add_columns
table_columns += ["p-value"]
table = pd.DataFrame("", index=index, columns=table_columns)
table["Variable"] = format_descriptive_table_variables(
table_dictionary, sep=sep, binary_symbol="†"
).tolist()
mfw = int(np.log10(_data.shape[0])) + 1 # Min field width, for formatting
table.loc[numeric_columns, "All"] = _data[numeric_columns].apply(
median_iqr_str, mfw=mfw
)
table.loc[binary_columns, "All"] = _data[binary_columns].apply(
n_percent_str, mfw=mfw
)
totals = pd.DataFrame(columns=table_columns, index=["totals"])
totals["Variable"] = "<b>Totals</b>"
totals["All"] = _data.shape[0]
if by_column is not None:
by_column_values = _data[by_column].unique()
for value in by_column_values:
ind = _data[by_column] == value
mfw = int(np.log10(ind.sum())) + 1 # Min field width, for format
table.loc[numeric_columns, value] = _data.loc[ind, numeric_columns].apply(
median_iqr_str, mfw=mfw
)
table.loc[binary_columns, value] = _data.loc[ind, binary_columns].apply(
n_percent_str, mfw=mfw
)
totals[value] = ind.sum()
if len(by_column_values) == 2:
y = _data[by_column] == by_column_values[0]
pvalues = pd.Series(index=table.index)
pvalues.loc[numeric_columns] = _data[numeric_columns].apply(
lambda x: mannwhitneyu(x, y, nan_policy="omit").pvalue, axis=0
)
pvalues.loc[binary_columns] = _data[binary_columns].apply(
lambda x: get_fisher_exact_pvalue(x, y), axis=0
)
table["p-value"] = pvalues.apply(
format_pvalue, significance=pvalue_significance
)
table = table.reset_index(drop=True)
if include_totals:
table = pd.concat([totals, table], axis=0).reset_index(drop=True)
table_key = """<b>KEY</b><br>(†) Count (%) | N, p-value using Chi-squared
test, (+) Median (IQR) | N, p-value using Mann-Whitney test<br>"""
table_key += ", ".join(
[
f"({k}) p < {str(v)}"
for k, v in pvalue_significance.items()
if (pvalues < v).any()
]
)
return table, table_key
############################################
############################################
# Formatting
############################################
############################################
[docs]
def trim_field_label(x: str, max_len: int = 40) -> str:
""":py:class:`str` : Trims field label using an optional max. length parameter that defaults to 40 characters.
Parameters
----------
x : str
The input field label.
max_len : int, default=40
An optional maximum length parameter to use for trimming, defaults to
:math:`40`.
Returns
-------
str
The trimmed field label.
""" # noqa : E501
if len(x) > max_len:
x = " ".join(x[:max_len].split(" ")[:-1]) + " ..."
return x
############################################
############################################
# Counts
############################################
############################################
[docs]
def get_counts(
data: pandas.DataFrame,
dictionary: pandas.DataFrame,
max_n_variables: int = 10,
sep: str = "___",
) -> pandas.DataFrame:
""":py:class:`pandas.DataFrame` : Returns a dataframe of variable column counts.
Parameters
----------
data : pandas.DataFrame
The incoming data.
dictionary : pandas.DataFrame
The data dictionary.
max_n_variables : int, default=10
Optional number of variables for which to take counts, defaults to
:math:`10`.
sep : str, default="___"
Optional field-value separator, defaults to ``"___"``.
Returns
-------
The variable column counts dataframe.
""" # noqa : E501
counts = data.apply(lambda x: x.sum()).T.reset_index()
counts.columns = ["variable", "count"]
counts = counts.sort_values(by=["count"], ascending=False).reset_index(drop=True)
if counts.shape[0] > max_n_variables:
counts = counts.head(max_n_variables)
short_format = format_variables(dictionary, max_len=40, sep=sep)
long_format = format_variables(dictionary, max_len=1000, sep=sep)
format_dict = dict(zip(dictionary["field_name"], long_format))
short_format_dict = dict(zip(dictionary["field_name"], short_format))
counts["label"] = counts["variable"].map(format_dict)
counts["short_label"] = counts["variable"].map(short_format_dict)
return counts
[docs]
def get_proportions(
data: pandas.DataFrame,
dictionary: pandas.DataFrame,
max_n_variables: int = 10,
ignore_branching_logic: bool = False,
branching_logic: str = "",
sep: str = "___",
) -> pandas.DataFrame:
""":py:class:`pandas.DataFrame` : Returns a dataframe of proportional counts for variable columns.
Parameters
----------
data : pandas.DataFrame
The incoming data.
dictionary : pandas.DataFrame
The data dictionary.
max_n_variables : int, default=10
Optional number of variables for which to take counts, defaults to
:math:`10`.
sep : str, default="___"
Optional field-value separator, defaults to ``"___"``.
Returns
-------
pandas.DataFrame
A dataframe of proportional counts for variable columns.
""" # noqa : E501
if ignore_branching_logic:
columns = [col for col in data.columns]
else:
dictionary_subset = dictionary.loc[
(dictionary["branching_logic"] == branching_logic)
]
columns = [
col for col in data.columns if col in dictionary_subset["field_name"].values
]
if len(columns) == 0:
branching_logic = dictionary.loc[
dictionary["field_name"].isin(data.columns), "branching_logic"
]
branching_logic = branching_logic.values[0]
dictionary_subset = dictionary.loc[
(dictionary["branching_logic"] == branching_logic)
]
columns = [
col
for col in data.columns
if col in dictionary_subset["field_name"].values
]
proportions = (
data[columns].apply(lambda x: (x.sum() / x.count(), x.sum())).T.reset_index()
)
proportions.columns = ["variable", "proportion", "count"]
proportions = proportions.sort_values(by=["count"], ascending=False).reset_index(
drop=True
)
if proportions.shape[0] > max_n_variables:
proportions = proportions.head(max_n_variables)
proportions = proportions.drop(columns="count")
proportions = proportions.sort_values(
by=["proportion"], ascending=False
).reset_index(drop=True)
short_format = format_variables(dictionary, max_len=40, sep=sep)
long_format = format_variables(dictionary, max_len=1000, sep=sep)
format_dict = dict(zip(dictionary["field_name"], long_format))
short_format_dict = dict(zip(dictionary["field_name"], short_format))
proportions["label"] = proportions["variable"].map(format_dict)
proportions["short_label"] = proportions["variable"].map(short_format_dict)
return proportions
[docs]
def get_upset_counts_intersections(
data: pandas.DataFrame,
dictionary: pandas.DataFrame,
variables: list[str] | None = None,
n_variables: int = 5,
sep: str = "___",
) -> tuple[pandas.DataFrame]:
""":py:class:`pandas.DataFrame` : Returns a dataframe of upset counts intersections.
Parameters
----------
data : pandas.DataFrame
The incoming data.
dictionary : pandas.DataFrame
The data dictionary.
variables : list, default=None
Optional list of names of variable columns for which to take counts,
defaults to ``None``.
n_variables : int, default=5
Optional limit for the number of variable columns, defaults to :math:`5`.
sep : str, default="___"
Optional field-value separator, defaults to ``"___"``.
Returns
-------
pandas.DataFrame
A dataframe of upset counts intersections.
""" # noqa : E501
# Convert variables and column names into their formatted names
long_format = format_variables(dictionary, max_len=1000, sep=sep)
short_format = format_variables(dictionary, max_len=40, sep=sep)
format_dict = dict(zip(dictionary["field_name"], long_format))
short_format_dict = dict(zip(dictionary["field_name"], short_format))
if variables is None:
variables = data.columns.tolist()
binary_columns = dictionary.loc[
(dictionary["field_type"] == "binary"), "field_name"
].tolist()
variables = [col for col in variables if col in binary_columns]
variables = [var for var in variables if data[var].sum() > 0]
data = data[variables].astype(float).fillna(0)
counts = data.sum().astype(int).reset_index().rename(columns={0: "count"})
counts = counts.sort_values(by="count", ascending=False).reset_index(drop=True)
counts["short_label"] = counts["index"].map(short_format_dict)
counts["label"] = counts["index"].map(format_dict)
variable_order_dict = dict(zip(counts["index"], counts.index))
variables = counts["index"].tolist()
if n_variables is not None:
variables = variables[:n_variables]
data = data[variables]
counts = counts.loc[counts["index"].isin(variables)]
intersections = data.loc[data.any(axis=1)].value_counts().reset_index()
intersections["index"] = intersections.drop(columns="count").apply(
lambda x: tuple(col for col in x.index if x[col] == 1), axis=1
)
intersections["label"] = intersections["index"].apply(
lambda x: tuple(format_dict[y] for y in x)
)
# The rest is reordering to make it look prettier
intersections = intersections.loc[(intersections["count"] > 0)]
intersections["index_n"] = intersections["index"].apply(len)
intersections["index_first"] = (
intersections[variables].idxmax(axis=1).map(variable_order_dict)
)
intersections["index_last"] = (
intersections[variables].idxmin(axis=1).map(variable_order_dict)
)
intersections = intersections.sort_values(
by=["count", "index_first", "index_last", "index_n"],
ascending=[False, True, False, False],
)
keep_columns = ["index", "label", "count"]
intersections = intersections[keep_columns].reset_index(drop=True)
return counts, intersections
[docs]
def get_pyramid_data(
data: pandas.DataFrame,
column_dict: dict[str, str],
left_side: str = "Female",
right_side: str = "Male",
) -> pandas.DataFrame:
""":py:class:`pandas.DataFrame` : Returns dual stack pyramid data.
Parameters
----------
data : pandas.DataFrame
The incoming data.
column_dict : dict
Dict of pyramid keys and data column names.
left_side : str, default="Female"
Optional label for the left side of the pyramid, defaults to
``"Female"``.
right_side : str, default="Male"
Optional label for the right side of the pyramid, defaults to
``"Male"``.
Returns
-------
pandas.DataFrame
Dual stack pyramid data.
"""
keys = ["side", "y_axis", "stack_group"]
# assert all(key in tuple(column_dict.keys()) for key in keys), 'Error'
columns = [column_dict[key] for key in keys]
df_pyramid = data[["subjid"] + columns].copy()
df_pyramid = df_pyramid.groupby(columns, observed=True).count().reset_index()
df_pyramid.rename(columns={"subjid": "value"}, inplace=True)
df_pyramid.rename(columns={v: k for k, v in column_dict.items()}, inplace=True)
df_pyramid = df_pyramid.loc[df_pyramid["side"].isin([left_side, right_side])]
df_pyramid.loc[:, "left_side"] = df_pyramid["side"] == left_side
df_pyramid = df_pyramid.sort_values(by="y_axis").reset_index(drop=True)
return df_pyramid
############################################
############################################
# Logistic Regression from Risk Factors
############################################
############################################
[docs]
def get_modelling_data(
data: pandas.DataFrame,
dictionary: pandas.DataFrame,
outcome_columns: str | typing.Iterable[str],
include_sections: typing.Iterable[str] = [
"demog",
"comor",
"adsym",
"vacci",
"vital",
"sympt",
"labs",
],
required_variables: typing.Iterable[str] | None = None,
include_types: typing.Iterable[str] = ["binary", "categorical", "numeric"],
exclude_suffix: typing.Iterable[str] = [
"_units",
"addi",
"otherl2",
"item",
"_oth",
"_unlisted",
"otherl3",
],
include_subjid: bool = False,
exclude_negatives: bool = True,
fillna: bool = True,
drop_first: bool = False,
sep: str = "___",
) -> pandas.DataFrame:
""":py:class:`pandas.DataFrame` : Returns modelling data.
Parameters
----------
data : pandas.DataFrame
Incoming data.
dictionary : pandas.DataFrame
Data dictionary.
outcome_columns : typing.Iterable
Outcome columns.
include_sections : typing.Iterable, default=["demog", "comor", "adsym", "vacci", "vital", "sympt", "labs"]
Optional list of names of sections to include, defaults to
``["demog", "comor", "adsym", "vacci", "vital", "sympt", "labs"]``.
required_variables: typing.Iterable, default=None
Required variable column names, defaults to ``None``.
include_types : typing.Iterable, default=["binary", "categorical", "numeric"]
Optional iterable of variable type names, defaults to
``["binary", "categorical", "numeric"]``.
exclude_suffix : typing.Iterable, default=["_units", "addi", "otherl2", "item", "_oth", "_unlisted", "otherl3"]
Optional iterable of suffixes to exclude, defaults to
``["_units", "addi", "otherl2", "item", "_oth", "_unlisted", "otherl3"]``.
include_subjid : bool, default=False
Optional boolean to indicate whether to include subject ID, defaults to
``False``.
exclude_negatives : bool, default=True
Optional boolean to indicate whether to drop negatives, defaults to
``True``.
fillna : bool, default=True
Optional boolean to fill nulls, defaults to ``True``.
drop_first : bool, default=False
Optional boolean relating to dropping columns, defaults to ``False``.
sep : str, default="___"
Optional field-value separator, defaults to ``"___"``.
Returns
-------
pandas.DataFrame
Returns the modelling data.
""" # noqa : E501
_data = data.copy()
if isinstance(outcome_columns, str):
outcome_columns = [outcome_columns]
include_columns = get_variables_by_section_and_type(
_data,
dictionary,
required_variables=required_variables,
include_types=include_types,
include_subjid=include_subjid,
include_sections=include_sections,
exclude_suffix=exclude_suffix,
)
for outcome_column in outcome_columns:
if outcome_column not in include_columns:
include_columns = [outcome_column] + include_columns
_data = _data[include_columns].dropna(axis=1, how="all").copy()
# Convert categorical variables to onehot-encoded binary columns
categorical_ind = dictionary["field_type"] == "categorical"
columns = dictionary.loc[categorical_ind, "field_name"].tolist()
columns = [col for col in columns if col not in tuple(outcome_columns)]
_data = convert_categorical_to_onehot(
_data, dictionary, categorical_columns=columns, drop_first=drop_first, sep=sep
)
binary_ind = dictionary["field_type"] == "binary"
columns = dictionary.loc[binary_ind, "field_name"].tolist()
columns = [col for col in columns if col in _data.columns]
if fillna is True:
with pd.option_context("future.no_silent_downcasting", True):
_data[columns] = _data[columns].fillna(False)
negative_values = ("no", "never smoked")
negative_columns = [
col for col in _data.columns if col.split(sep)[-1].lower() in negative_values
]
if exclude_negatives:
_data.drop(columns=negative_columns, inplace=True)
return _data
[docs]
def variance_influence_factor_backwards_elimination(
data: pandas.DataFrame,
dictionary: pandas.DataFrame,
predictors_list: typing.Iterable[str],
sep: str = "___",
) -> tuple[typing.Iterable[str], pandas.DataFrame]:
""":py:class:`tuple` : Returns an iterable of retained columns and the VIF backwards elimination data.
Parameters
----------
data : pandas.DataFrame
Incoming data.
dictionary : pandas.DataFrame
Data dictionary.
predictors_list : typing.Iterable
Iterable of predictor variable column names.
sep : str, default="___"
Optional field-value separator, defaults to ``"___"``.
Returns
-------
tuple
An iterable of retained columns and the VIF backwards elimination data.
""" # noqa : E501
_data = data.copy()
numeric_ind = dictionary["field_type"] == "numeric"
numeric_columns = dictionary.loc[numeric_ind, "field_name"].tolist()
numeric_columns = [col for col in numeric_columns if col in _data.columns]
numeric_columns = [col for col in numeric_columns if col in predictors_list]
categorical_ind = dictionary["field_type"].isin(["binary"])
categorical_columns = dictionary.loc[categorical_ind, "field_name"].tolist()
categorical_columns = [col for col in categorical_columns if col in _data.columns]
categorical_columns = [col for col in categorical_columns if col in predictors_list]
_data[numeric_columns] = (
_data[numeric_columns] - _data[numeric_columns].mean()
) / _data[numeric_columns].std()
_data = 1.0 * pd.concat(
[_data[numeric_columns], _data[categorical_columns]], axis=1
).astype(float)
keep_columns = _data.columns
iterative_vif = pd.DataFrame(keep_columns, columns=["variable"])
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=RuntimeWarning)
iterative_vif["vif_iter_1"] = [
variance_inflation_factor(_data[keep_columns].values, ii)
for ii in range(len(keep_columns))
]
n = 1
while (iterative_vif["vif_iter_" + str(n)] > 10).any():
remove_column = iterative_vif.loc[
iterative_vif["vif_iter_" + str(n)].idxmax(), "variable"
]
remove_column = remove_column.split(sep)[0]
keep_columns = [
col for col in keep_columns if (col.split(sep)[0] != remove_column)
]
n += 1
vif = pd.DataFrame(keep_columns, columns=["variable"])
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=RuntimeWarning)
vif["vif_iter_" + str(n)] = [
variance_inflation_factor(_data[keep_columns].values, ii)
for ii in range(len(keep_columns))
]
iterative_vif = pd.merge(iterative_vif, vif, how="left", on="variable")
return keep_columns, iterative_vif
[docs]
def remove_single_binary_outcome_predictors(
data: pandas.DataFrame,
dictionary: pandas.DataFrame,
predictors: typing.Iterable[str],
outcome: str,
) -> typing.Iterable[str]:
""":py:class:`typing.Iterable` : Returns a list of retained columns in the data after removing single binary outcome predictor columns.
Removes binary predictors that are associated with only one outcome, e.g.
if all patients with ``some_variable=1`` have ``outcome=1``.
Parameters
----------
data: pandas.DataFrame
The incoming data.
dictionary: pandas.DataFrame
The data dictionary.
predictors: typing.Iterable
Iterable of predictor variable column names.
outcome : str
The outcome string.
Returns
-------
typing.Iterable:
List of predictor variable column names excluding any that can't be
used in the logistic regression model.
""" # noqa : E501
_data = data.copy()
numeric_ind = dictionary["field_type"] == "numeric"
numeric_columns = dictionary.loc[numeric_ind, "field_name"].tolist()
numeric_columns = [col for col in numeric_columns if col in _data.columns]
numeric_columns = [col for col in numeric_columns if col in predictors]
categorical_ind = dictionary["field_type"].isin(["binary"])
categorical_columns = dictionary.loc[categorical_ind, "field_name"].tolist()
categorical_columns = [col for col in categorical_columns if col in data.columns]
categorical_columns = [col for col in categorical_columns if col in predictors]
result = (
_data.groupby(outcome)[categorical_columns]
.apply(
lambda x: x.isin([True]).any() & x.isin([False]).any(), include_groups=False
)
.T
)
keep_columns = result.loc[result.all(axis=1)].index.tolist()
keep_columns = keep_columns + numeric_columns
return keep_columns
[docs]
def regression_summary_table(
table: pandas.DataFrame,
dictionary: pandas.DataFrame,
highlight_predictors: dict[str, typing.Iterable[str]] | None = None,
pvalue_significance: float | None = None,
result_type: str = "OddsRatio",
sep: str = "___",
) -> pandas.DataFrame:
""":py:class:`pandas.DataFrame` : Returns a regression summary table.
Parameters
----------
table : pandas.DataFrame
The incoming table.
dictionary : pandas.DataFrame
The data dictionary.
highlight_predictors : dict, default=None
Optional. No description available, defaults to ``None``.
pvalue_significance : int, default=5
Optional :math:`p`-value significance level, defaults to ``None``.
result_type : str, default="OddsRatio"
Optional. No description avaiable, defaults to ``"OddsRatio"``.
sep : str, default="___"
Optional field-value separator, defaults to ``"___"``.
Returns
-------
pandas.DataFrame
Regression summary table.
""" # noqa : E501
# Convert variables and column names into their formatted names
variables = table["Variable"].tolist()
new_variables = (
variables
+ dictionary.loc[
(
(dictionary["field_name"].isin(variables))
& (dictionary["parent"].isin([""]) == 0)
),
"parent",
].tolist()
)
new_variables = list(set(new_variables))
while len(variables) != len(new_variables):
variables = new_variables
new_variables = (
variables
+ dictionary.loc[
(
(dictionary["field_name"].isin(variables))
& (dictionary["parent"].isin([""]) == 0)
),
"parent",
].tolist()
)
new_variables = list(set(new_variables))
# Reorders the variables
dictionary = dictionary.set_index("field_name")
nonrepeated_parent = (
dictionary.loc[variables]
.groupby("parent")
.apply(len, include_groups=False)
.eq(1)
)
nonrepeated_parent = [
p
for p in nonrepeated_parent.loc[nonrepeated_parent].index
if (dictionary.loc[p, "field_type"] != "section")
]
variables = [var for var in variables if var not in nonrepeated_parent]
dictionary = dictionary.reset_index()
variables = dictionary.loc[
(dictionary["field_name"].isin(variables)), "field_name"
].tolist()
for reg_type in ["multi", "uni"]:
table[f"{result_type} ({reg_type})"] = table.apply(
lambda x: "%.2f" % x[f"{result_type} ({reg_type})"]
+ " ("
+ "%.2f" % x[f"LowerCI ({reg_type})"]
+ ", "
+ "%.2f" % x[f"UpperCI ({reg_type})"]
+ ")",
axis=1,
)
table[f"p-value ({reg_type})"] = table[f"p-value ({reg_type})"].apply(
format_pvalue, significance=pvalue_significance
)
table = table.drop(columns=[f"LowerCI ({reg_type})", f"UpperCI ({reg_type})"])
add_variables = [var for var in variables if var not in table["Variable"].tolist()]
add_table = pd.DataFrame("", columns=table.columns, index=range(len(add_variables)))
add_table["Variable"] = add_variables
table = pd.concat([table, add_table], axis=0)
table = table.set_index("Variable").loc[variables].reset_index()
add_key = pd.Series("", index=table.index)
if highlight_predictors is not None:
for key in highlight_predictors:
add_key.loc[table["Variable"].isin(highlight_predictors[key])] += key
add_key = add_key.apply(lambda x: x if x == "" else f" ({x})")
formatted_labels_v1 = format_descriptive_table_variables(
dictionary, add_key=False, sep=sep
)
formatted_labels_v2 = format_variables(dictionary, sep=sep)
v1_ind = dictionary["parent"].isin(variables)
v2_ind = dictionary["parent"].isin(variables) == 0
mapping_dict = {
**dict(
zip(dictionary.loc[v1_ind, "field_name"], formatted_labels_v1.loc[v1_ind])
),
**dict(
zip(dictionary.loc[v2_ind, "field_name"], formatted_labels_v2.loc[v2_ind])
),
}
table["Variable"] = table["Variable"].map(mapping_dict)
table["Variable"] = table["Variable"] + add_key
return table
[docs]
def execute_glmm_regression(
elr_dataframe_df: pandas.DataFrame,
elr_outcome: str,
elr_predictors: typing.Iterable[str],
elr_groups: str,
model_type: str = "linear",
print_results: bool = True,
labels: dict[str, str] | None = None,
reg_type: str = "multi",
) -> pandas.DataFrame:
""":py:class:`pandas.DataFrame` : Executes a mixed effects model for linear or logistic regression.
Parameters
----------
elr_dataframe_df : pandas.DataFrame
The incoming data.
elr_outcome : str
Name of the response variable.
elr_predictors : typing.Iterable
Iterable of predictor variable names.
elr_groups : str
Name of the variable that defines the groups (random effect).
model_type : str, default="linear"
Optional regression model type - use ``"linear"`` for linear regression
or ``"logistic"`` for logistic regression; defaults to ``"linear"``.
print_results : bool, default=True
Optional indicator of whether to print the results summary, defaults to
``True``.
labels : dict, default=None
Optional map of variable names to readable labels, defaults to
``None``.
reg_type : str, default="multi"
Optional regression type - ``"uni"`` for univariate, ``"multi"`` for
multivariate. Defaults to ``"multi"``.
Returns
-------
pandas.DataFrame
The model results.
""" # noqa : E501
# Builds the formula
elr_formula_str = elr_outcome + " ~ " + " + ".join(elr_predictors)
# Converts predictor categorical variables
elr_categorical_vars_list = elr_dataframe_df.select_dtypes(
include=["object", "category"]
)
elr_categorical_vars_list = elr_categorical_vars_list.columns.intersection(
elr_predictors
)
for elr_var_str in elr_categorical_vars_list:
elr_dataframe_df[elr_var_str] = elr_dataframe_df[elr_var_str].astype("category")
# Converts the groups column to string to ensure that the values are hashable
elr_dataframe_df[elr_groups] = elr_dataframe_df[elr_groups].astype(str)
if model_type.lower() == "linear":
# Mixed linear model using MixedLM (following your function)
elr_model_obj = smf.mixedlm(
formula=elr_formula_str,
data=elr_dataframe_df,
groups=elr_dataframe_df[elr_groups],
)
elr_result_obj = elr_model_obj.fit()
fixed_effects = elr_result_obj.fe_params
conf_int_df = elr_result_obj.conf_int().loc[fixed_effects.index]
pvalues = elr_result_obj.pvalues.loc[fixed_effects.index]
elr_summary_df = pd.DataFrame(
{
"Study": fixed_effects.index,
"Coef": fixed_effects.values,
"IC Low": conf_int_df.iloc[:, 0].values,
"IC High": conf_int_df.iloc[:, 1].values,
"p-value": pvalues.values,
}
)
elif model_type.lower() == "logistic":
# Mixed logistic model using BinomialBayesMixedGLM (Bayesian approach via VB)
# Defines vc_formula for random effect (random intercept per group)
vc_formula = {elr_groups: "0 + C({})".format(elr_groups)}
elr_model_obj = BinomialBayesMixedGLM.from_formula(
formula=elr_formula_str, vc_formulas=vc_formula, data=elr_dataframe_df
)
elr_result_obj = elr_model_obj.fit_vb()
# Extracts the fixed effect names and determines how many there are
param_names = elr_model_obj.exog_names
n_fixed = len(param_names)
fixed_effects = pd.Series(elr_result_obj.params[:n_fixed], index=param_names)
# Attempts to obtain the covariance matrix and extracts the slice
# corresponding to fixed effects
try:
cov_params = elr_result_obj.cov_params()
except Exception:
try:
cov_params = elr_result_obj.vcov
except Exception:
cov_params = None
if cov_params is not None:
# If it is a DataFrame, use .iloc; otherwise, assume a NumPy array
if hasattr(cov_params, "iloc"):
cov_params_fixed = cov_params.iloc[:n_fixed, :n_fixed]
else:
cov_params_fixed = cov_params[:n_fixed, :n_fixed]
bse = np.sqrt(np.diag(cov_params_fixed))
bse = pd.Series(bse, index=param_names)
# Calculates p-values manually (Wald test, normal approximation)
z_values = fixed_effects / bse
pvalues = 2 * (1 - norm.cdf(np.abs(z_values)))
pvalues = pd.Series(pvalues, index=param_names)
else:
bse = pd.Series(np.full(fixed_effects.shape, np.nan), index=param_names)
pvalues = pd.Series(np.full(fixed_effects.shape, np.nan), index=param_names)
# Calculates confidence intervals using 1.96 as the quantile of the
# normal distribution
lower_ci = fixed_effects - 1.96 * bse
upper_ci = fixed_effects + 1.96 * bse
# Calculates Odds Ratios and corresponding intervals
odds_ratios = np.exp(fixed_effects)
odds_lower = np.exp(lower_ci)
odds_upper = np.exp(upper_ci)
elr_summary_df = pd.DataFrame(
{
"Study": fixed_effects,
"OddsRatio": odds_ratios.values,
"IC Low": odds_lower.values,
"IC High": odds_upper.values,
"p-value": pvalues.values,
}
)
else:
raise ValueError("model_type must be 'linear' or 'logistic'")
# Applies label mapping if provided
if labels:
def elr_parse_variable_name(var_name):
if var_name == "Intercept" or var_name.lower() == "const":
return labels.get("Intercept", "Intercept")
elif "[" in var_name:
base_var = var_name.split("[")[0]
level = var_name.split("[")[1].split("]")[0]
base_var_name = base_var.replace("C(", "").replace(")", "").strip()
label = labels.get(base_var_name, base_var_name)
return f"{label} ({level})"
else:
var_name_clean = var_name.replace("C(", "").replace(")", "").strip()
return labels.get(var_name_clean, var_name_clean)
elr_summary_df["Study"] = elr_summary_df["Study"].apply(elr_parse_variable_name)
# Removes the intercept row if present
elr_summary_df = elr_summary_df[
~elr_summary_df["Study"].isin(["Intercept", "const"])
]
# Reorders the columns according to the model
if model_type.lower() == "logistic":
elr_summary_df = elr_summary_df[
["Study", "OddsRatio", "IC Low", "IC High", "p-value"]
]
else:
elr_summary_df = elr_summary_df[
["Study", "Coef", "IC Low", "IC High", "p-value"]
]
# Formats the numerical values
if model_type.lower() == "logistic":
elr_summary_df["OddsRatio"] = elr_summary_df["OddsRatio"].round(3)
else:
elr_summary_df["Coef"] = elr_summary_df["Coef"].round(3)
elr_summary_df["IC Low"] = elr_summary_df["IC Low"].round(3)
elr_summary_df["IC High"] = elr_summary_df["IC High"].round(3)
elr_summary_df["p-value"] = elr_summary_df["p-value"].apply(lambda x: f"{x:.4f}")
# Renames the columns according to the reg_type parameter
if reg_type.lower() == "uni":
if model_type.lower() == "logistic":
elr_summary_df.rename(
columns={
"OddsRatio": "OddsRatio (uni)",
"IC Low": "LowerCI (uni)",
"IC High": "UpperCI (uni)",
"p-value": "p-value (uni)",
},
inplace=True,
)
else:
elr_summary_df.rename(
columns={
"Coef": "Coef (uni)",
"IC Low": "LowerCI (uni)",
"IC High": "UpperCI (uni)",
"p-value": "p-value (uni)",
},
inplace=True,
)
else:
if model_type.lower() == "logistic":
elr_summary_df.rename(
columns={
"OddsRatio": "OddsRatio (multi)",
"IC Low": "LowerCI (multi)",
"IC High": "UpperCI (multi)",
"p-value": "p-value (multi)",
},
inplace=True,
)
else:
elr_summary_df.rename(
columns={
"Coef": "Coef (multi)",
"IC Low": "LowerCI (multi)",
"IC High": "UpperCI (multi)",
"p-value": "p-value (multi)",
},
inplace=True,
)
if print_results:
print(elr_summary_df)
return elr_summary_df
[docs]
def execute_glm_regression(
elr_dataframe_df: pandas.DataFrame,
elr_outcome: str,
elr_predictors: typing.Iterable,
model_type: str = "linear",
print_results: bool = True,
labels: dict[str, str] | None = None,
reg_type: str = "Multi",
) -> pandas.DataFrame:
""":py:class:`pandas.DataFrame` : Executes a GLM (Generalized Linear Model) for linear or logistic regression.
Parameters
----------
elr_dataframe_df : pandas.DataFrame
The incoming data.
elr_outcome : str
Name of the response variable.
elr_predictors: typing.Iterable
Iterable of predictor variable names.
model_type : str, default="linear"
Optional indicator regression model type - use ``"linear"`` for linear
regression (Gaussian) or ``"logistic"`` for logistic regression
(Binomial); defaults to ``"linear"``.
print_results : bool, default=True
Optional indicator of whether to print the results table, defaults to
``True``.
labels : dict, default=None
Optional map of variable names to readable labels, defaults to
``None``.`.
reg_type : str, default="multi"
Optional regression type - ``"uni"`` for univariate, ``"multi"`` for
multivariate. Defaults to ``"multi"``.
Returns
--------
pandas.DataFrame
The model results.
""" # noqa : E501
# Defines the family according to model_type
if model_type.lower() == "logistic":
family = sm.families.Binomial()
elif model_type.lower() == "linear":
family = sm.families.Gaussian()
else:
raise ValueError("model_type must be 'linear' or 'logistic'")
# Builds the formula
formula = elr_outcome + " ~ " + " + ".join(elr_predictors)
# Converts categorical variables to 'category' type
categorical_vars = elr_dataframe_df.select_dtypes(
include=["object", "category"]
).columns.intersection(elr_predictors)
for var in categorical_vars:
elr_dataframe_df[var] = elr_dataframe_df[var].astype("category")
# Fits the GLM model
model = smf.glm(formula=formula, data=elr_dataframe_df, family=family)
result = model.fit()
# Extracts the results table
summary_table = result.summary2().tables[1].copy()
# For logistic regression, calculates Odds Ratios; for linear, uses the
# coefficients directly.
if model_type.lower() == "logistic":
summary_table["Odds Ratio"] = np.exp(summary_table["Coef."])
summary_table["IC Low"] = np.exp(summary_table["[0.025"])
summary_table["IC High"] = np.exp(summary_table["0.975]"])
summary_df = summary_table[
["Odds Ratio", "IC Low", "IC High", "P>|z|"]
].reset_index()
summary_df = summary_df.rename(
columns={
"index": "Study",
"Odds Ratio": "OddsRatio",
"IC Low": "LowerCI",
"IC High": "UpperCI",
"P>|z|": "p-value",
}
)
else:
summary_df = summary_table[["Coef.", "[0.025", "0.975]", "P>|z|"]].reset_index()
summary_df = summary_df.rename(
columns={
"index": "Study",
"Coef.": "Coefficient",
"[0.025": "LowerCI",
"0.975]": "UpperCI",
"P>|z|": "p-value",
}
)
# Maps variable names to readable labels, if provided
if labels:
def parse_variable_name(var_name):
if var_name == "Intercept":
return labels.get("Intercept", "Intercept")
elif "[" in var_name:
base_var = var_name.split("[")[0]
level = var_name.split("[")[1].split("]")[0]
base_var_name = base_var.replace("C(", "").replace(")", "").strip()
label = labels.get(base_var_name, base_var_name)
return f"{label} ({level})"
else:
var_name_clean = var_name.replace("C(", "").replace(")", "").strip()
return labels.get(var_name_clean, var_name_clean)
summary_df["Study"] = summary_df["Study"].apply(parse_variable_name)
# Reorders the columns
if model_type.lower() == "logistic":
summary_df = summary_df[["Study", "OddsRatio", "LowerCI", "UpperCI", "p-value"]]
else:
summary_df = summary_df[
["Study", "Coefficient", "LowerCI", "UpperCI", "p-value"]
]
# Removes the letter 'T.' from categorical variables
summary_df["Study"] = summary_df["Study"].str.replace("T.", "")
# Formats the numerical values
for col in summary_df.columns[1:-1]:
summary_df[col] = summary_df[col].round(3)
summary_df["p-value"] = summary_df["p-value"].apply(lambda x: f"{x:.4f}")
# Removes the intercept row if desired (optional)
summary_df = summary_df[summary_df["Study"] != "Intercept"]
# Renames the columns according to the type of regression
if reg_type.lower() == "uni":
if model_type.lower() == "logistic":
summary_df.rename(
columns={
"OddsRatio": "OddsRatio (uni)",
"LowerCI": "LowerCI (uni)",
"UpperCI": "UpperCI (uni)",
"p-value": "p-value (uni)",
},
inplace=True,
)
else:
summary_df.rename(
columns={
"Coefficient": "Coefficient (uni)",
"LowerCI": "LowerCI (uni)",
"UpperCI": "UpperCI (uni)",
"p-value": "p-value (uni)",
},
inplace=True,
)
elif reg_type.lower() == "multi":
if model_type.lower() == "logistic":
summary_df.rename(
columns={
"OddsRatio": "OddsRatio (multi)",
"LowerCI": "LowerCI (multi)",
"UpperCI": "UpperCI (multi)",
"p-value": "p-value (multi)",
},
inplace=True,
)
else:
summary_df.rename(
columns={
"Coefficient": "Coefficient (multi)",
"LowerCI": "LowerCI (multi)",
"UpperCI": "UpperCI (multi)",
"p-value": "p-value (multi)",
},
inplace=True,
)
if print_results:
print(summary_df)
return summary_df
[docs]
def execute_cox_model(
data: pandas.DataFrame,
duration_col: str,
event_col: str,
predictors: typing.Iterable[str],
labels: dict[str, str] | None = None,
) -> pandas.DataFrame:
""":py:class:`pandas.DataFrame` : Executes a Cox Proportional Hazards model without weights and returns a summary of the results.
Parameters
----------
data : pandas.DataFrame
The incoming data.
duration_col : str
Name of the time variable.
event_col : str
Name of the outcome variable (binary event).
predictors : typing.Iterable
Names of predictor variables.
labels : dict, default=None
Dictionary mapping variable names to readable labels, default=``None``.
Returns
-------
pandas.DataFrame
The model results.
""" # noqa : E501
# Ensure categorical variables are treated appropriately
categorical_vars = data.select_dtypes(
include=["object", "category"]
).columns.intersection(predictors)
for var in categorical_vars:
data[var] = data[var].astype("category")
# Convert categorical variables to dummies
data = pd.get_dummies(data, columns=categorical_vars, drop_first=True)
# Ensure numerical variables have the correct type
data[duration_col] = pd.to_numeric(data[duration_col], errors="coerce")
data[event_col] = pd.to_numeric(data[event_col], errors="coerce")
# Update predictors to include one-hot encoded columns
predictors = [
c
for c in data.columns
if c in predictors or any(c.startswith(p + "_") for p in categorical_vars)
]
# Remove rows with missing values in essential columns
data = data.dropna(subset=[duration_col, event_col] + predictors)
# Select relevant columns
df_cox = data[[duration_col, event_col] + predictors]
# Fit the Cox model
cph = CoxPHFitter()
cph.fit(df_cox, duration_col=duration_col, event_col=event_col)
# Model summary
summary = cph.summary
summary["HR"] = np.exp(summary["coef"])
summary["CI_lower"] = np.exp(summary["coef"] - 1.96 * summary["se(coef)"])
summary["CI_upper"] = np.exp(summary["coef"] + 1.96 * summary["se(coef)"])
# summary['p_adj'] = summary['p'].apply(
# lambda p: "<0.001" if p < 0.001 else round(p, 3))
summary["p_adj"] = summary["p"].apply(lambda p: round(p, 3))
# Select relevant columns for the final summary
summary_df = summary[["HR", "p_adj", "CI_lower", "CI_upper"]].reset_index()
summary_df.rename(columns={"index": "Variable", "p_adj": "p-value"}, inplace=True)
# Replace variable labels if provided
if labels:
summary_df["Variable"] = (
summary_df["Variable"].map(labels).fillna(summary_df["Variable"])
)
return summary_df
[docs]
def execute_kaplan_meier(
data: pandas.DataFrame,
duration_col: str,
event_col: str,
group_col: str,
alpha=0.05,
n_times=5,
) -> tuple[pandas.DataFrame, pandas.DataFrame, float]:
""":py:class:`tuple` : Executes the Kaplan-Meier model and returns the results.
Parameters
----------
data : pandas.DataFrame
The incoming data.
duration_col : str
Name of the time variable.
event_col : str
Name of the outcome variable (binary event).
group_col : str
Name of the grouping column.
alpha : float, default=0.05
Optional alpha, defaults to :math:`0.05`.
n_times : int, default=5
Optional. No description available, defaults to :math:`5`.
Returns
-------
pandas.DataFrame
A tuple consisting of the model results, risk table and the
:math:`p`-value.
""" # noqa : E501
# Remove rows with missing values in relevant columns
data = data.dropna(subset=[duration_col, event_col, group_col])
kmf = KaplanMeierFitter()
unique_groups = data[group_col].sort_values().unique()
df_km = pd.DataFrame(columns=["timeline"])
max_time = (data[duration_col].max() // n_times) * (n_times + 1)
times = np.arange(0, max_time, n_times)
# Compute survival curves and confidence intervals for each group
for group in unique_groups:
group_data = data[data[group_col] == group]
kmf.fit(
group_data[duration_col],
event_observed=group_data[event_col],
label=str(group),
alpha=alpha,
)
ci_lower = kmf.confidence_interval_[f"{group}_lower_{1 - alpha}"] * 100
ci_upper = kmf.confidence_interval_[f"{group}_upper_{1 - alpha}"] * 100
survival_curve = pd.concat(
[kmf.survival_function_ * 100, ci_lower, ci_upper], axis=1
)
survival_curve = (
survival_curve.drop_duplicates()
.reset_index()
.rename(columns={"index": "timeline"})
)
df_km = pd.merge(df_km, survival_curve, on="timeline", how="outer").bfill()
# survival_curves[group] =
# confidence_intervals[group] = (ci_lower, ci_upper)
# Perform log-rank test
if len(unique_groups) == 2:
group1_data = data[data[group_col] == unique_groups[0]]
group2_data = data[data[group_col] == unique_groups[1]]
result = logrank_test(
group1_data[duration_col],
group2_data[duration_col],
event_observed_A=group1_data[event_col],
event_observed_B=group2_data[event_col],
)
p_value = result.p_value
elif len(unique_groups) > 2:
result = multivariate_logrank_test(
data[duration_col], data[group_col], data[event_col]
)
p_value = result.p_value
else:
p_value = np.nan
# Generate risk table: number of individuals at risk over time
risk_counts = {
group: [
((data[group_col] == group) & (data[duration_col] >= t)).sum()
for t in times
]
for group in unique_groups
}
risk_table = pd.DataFrame(risk_counts, index=times).T
risk_table.insert(0, "Group", risk_table.index)
risk_table.reset_index(drop=True, inplace=True)
return df_km, risk_table, p_value
############################################
############################################
# FEATURE SELECTION
# (includes basic imputation, variance check, correlation check)
############################################
############################################
[docs]
def impute_miss_val(
data: pandas.DataFrame,
dictionary: pandas.DataFrame,
outcome_column: str = "outco_binary_outcome",
missing_threshold: float = 0.7,
verbose: bool = False,
) -> pandas.DataFrame:
""":py:class:`pandas.DataFrame` : The data with missing values imputed.
Imputes missing values or drops columns based on missing value proportion
and median.
Parameters
----------
data : pandas.DataFrame
The incoming data.
dictionary : pandas.DataFrame
The data dictionary.
outcome_column : str, default="outco_binary_outcome"
Optional outcome column, defaults to ``"outco_binary_outcome"``.
missing_threshold : float, default=0.7
A proportional imputation threshold for missing values, defaults to
:math:`0.7`.
verbose, bool=False
Optional indicator of whether to print imputations summary, defaults
to ``False``.
Returns
-------
pandas.DataFrame
Data with missing values imputed or columns dropped.
""" # noqa : E501
keep_columns = ["subjid", outcome_column]
numeric_ind = dictionary["field_type"] == "numeric"
numeric_columns = dictionary.loc[numeric_ind, "field_name"].tolist()
numeric_columns = [col for col in numeric_columns if col in data.columns]
binary_ind = dictionary["field_type"] == "binary"
binary_columns = dictionary.loc[binary_ind, "field_name"].tolist()
binary_columns = [col for col in binary_columns if col in data.columns]
categorical_ind = dictionary["field_type"].isin(["binary"])
categorical_columns = dictionary.loc[categorical_ind, "field_name"].tolist()
categorical_columns = [col for col in categorical_columns if col in data.columns]
# Calculate the proportion of missing values in each column
missing_proportions = data.isnull().mean()
# Identify columns to drop
columns_to_drop = missing_proportions[
(missing_proportions > missing_threshold)
].index.tolist()
columns_to_drop = [col for col in columns_to_drop if col not in keep_columns]
data = data.drop(columns=columns_to_drop)
impute_columns = [col for col in data.columns if col not in keep_columns]
# Impute missing values in remaining columns
for col in impute_columns:
if col in numeric_columns:
data[col] = data[col].fillna(data[col].median())
if col in (binary_columns + categorical_columns):
data[col] = data[col].fillna(data[col].mode().values[0])
if verbose:
print("\nSummary after Imputation")
print("Size of remaining data:", data.shape)
return data
[docs]
def rmv_low_var(
data: pandas.DataFrame,
dictionary: pandas.DataFrame,
mad_threshold: float = 0.1,
freq_threshold: float = 0.05,
outcome_column: str = "outco_binary_outcome",
verbose: bool = False,
) -> pandas.DataFrame:
""":py:class:`pandas.DataFrame` : Removes numerical variables from the data with Median Absolute Deviation (MAD) below a given threshold.
Excludes binary columns from MAD calculation. Removes binary columns with
very low frequencies.
Parameters
----------
data : pandas.DataFrame
The incoming data.
dictionary : pandas.DataFrame
The data dictionary.
mad_threshold : float, default=0.1
Optional MAD threshold, defaults to :math:`0.1`.
freq_threshold : float, default=0.5
Optional frequency threshold, defaults to :math:`0.05`.
outcome_column : str, default=``"outco_binary_outcome"``
Optional outcome column, defaults to ``"outco_binary_outcome"``.
verbose : bool, default=False
Optional indicator of whether to print MAD analysis summary.
Returns
-------
pandas.DataFrame
The data with low MAD columns removed.
""" # noqa : E501
keep_columns = ["subjid", outcome_column]
numeric_ind = dictionary["field_type"] == "numeric"
numeric_columns = dictionary.loc[numeric_ind, "field_name"].tolist()
numeric_columns = [col for col in numeric_columns if col in data.columns]
binary_ind = dictionary["field_type"] == "binary"
binary_columns = dictionary.loc[binary_ind, "field_name"].tolist()
binary_columns = [col for col in binary_columns if col in data.columns]
# Remove single-valued columns first
single_value_columns = data.columns[data.nunique() == 1]
single_value_columns = [
col for col in single_value_columns if col not in keep_columns
]
data = data.drop(columns=single_value_columns)
# Calculate low frequency binary numeric column
# Handle binary columns - convert to numeric first
binary_counts = data[binary_columns].mean(axis=0)
binary_counts = binary_counts.apply(lambda x: min((x, 1 - x)))
exclude_binary_columns = [
col for col in binary_columns if (binary_counts[col] < freq_threshold)
]
mad_series = (
data[numeric_columns]
.apply(lambda x: x / x.abs().max(), axis=0)
.apply(lambda x: np.median(np.abs(x - np.median(x))))
)
exclude_numeric_columns = [
col for col in numeric_columns if (mad_series[col] < mad_threshold)
]
# Identify columns to keep:
# 1. Non-numeric columns
# 2. Binary numeric columns
# 3. Non-binary numeric columns with MAD above threshold
# Start with all CAT columns
exclude_columns = exclude_binary_columns + exclude_numeric_columns
keep_columns = [col for col in data.columns if col not in exclude_columns]
data = data[keep_columns]
if verbose:
print("\nMAD Analysis Summary:")
print(f"Single value columns removed: {len(single_value_columns)}")
print(f"Total binary columns: {len(binary_columns)}")
print(f"Total numeric columns: {len(numeric_columns)}")
print(f"""High Frequency Binary columns kept: \
{len(binary_columns) - len(exclude_binary_columns)}""")
print(f"""Columns removed due to low MAD: \
{len(numeric_columns) - len(exclude_numeric_columns)}""")
return data
[docs]
def rmv_high_corr(
data: pandas.DataFrame,
dictionary: pandas.DataFrame,
outcome_column: str = "outco_binary_outcome",
correlation_threshold: float = 0.5,
verbose: bool = False,
) -> pandas.DataFrame:
""":py:class:`pandas.DataFrame` : Removes variables in the data with high multicollinearity.
Arbitrarily selecting one variable to remove if the correlation between two
variables is above a threshold.
Parameters
----------
data : pandas.DataFrame
The incoming data.
dictionary : pandas.DataFrame
The data dictionary.
outcome_column : str, default=``"outco_binary_outcome"``
Optional outcome column, defaults to ``"outco_binary_outcome"``.
correlation_threshold : float, default=0.5
Optional correlation threshold, defaults to :math:`0.5`.
verbose : bool, default=False
Optional indicator of whether to print correlation summary.
Returns
-------
pandas.DataFrame
The data with high correlation variables removed.
""" # noqa : E501
keep_columns = ["subjid", outcome_column]
numeric_ind = dictionary["field_type"] == "numeric"
numeric_columns = dictionary.loc[numeric_ind, "field_name"].tolist()
numeric_columns = [col for col in numeric_columns if col in data.columns]
# Step 1: Select numeric columns only
# numeric_cols = df.select_dtypes(include=[np.number]).columns
df_numeric = data[numeric_columns] # DataFrame with only numeric columns
# Step 2: Calculate the correlation matrix
corr_matrix = df_numeric.corr().abs()
# Step 3: Identify highly correlated columns
corr_pairs = (
corr_matrix.where(np.triu(np.ones(corr_matrix.shape), 1).astype(bool))
.where(lambda x: x > correlation_threshold)
.stack()
.reset_index()
)
# Step 4: Drop highly correlated columns (arbitrarily drop second column)
exclude_columns = corr_pairs["level_1"].unique().tolist()
exclude_columns = [col for col in exclude_columns if col not in keep_columns]
data = data.drop(columns=exclude_columns)
if verbose:
print("\nCORR Summary")
print(f"Columns removed due to high correlation: \
{len(exclude_columns)}")
return data
[docs]
def lasso_var_sel_binary(
data: pandas.DataFrame,
outcome_column: str = "mapped_outcome",
metric: str = "balanced_accuracy",
threshold: float = 1e-3,
gridsearch_params: dict[str, typing.Iterable[float]] = {
"l1_ratios": [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9],
"Cs": [1e-3, 3.16e-3, 1e-2, 3.16e-2, 1e-1, 3.16e-1, 1, 3.16, 10],
},
random_state: int = 42,
verbose: bool = False,
sep: str = "___",
) -> tuple[typing.Any]:
""":py:class:`tuple` : Prepares data and selects features using binary logistic regression with elastic net penalty.
Specifically designed for binary outcomes only.
Parameters
----------
data : pandas.DataFrame
The incoming data.
outcome_column : str, default="mapped_outcome"
Optional outcome column, defaults to ``"mapped_outcome"``.
metric : str, default="balanced_accuracy"
Optional metric, defaults to ``"balanced_accuracy"``.
threshold : float, default=1e-3
Optional threshold, defaults to :math:`0.001`.
gridsearch_params : dict, default={"l1_ratios": [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9], "Cs": [1e-3, 3.16e-3, 1e-2, 3.16e-2, 1e-1, 3.16e-1, 1, 3.16, 10]}
Optional grid search params, defaults to:
::
{
"l1_ratios": [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9],
"Cs": [1e-3, 3.16e-3, 1e-2, 3.16e-2, 1e-1, 3.16e-1, 1, 3.16, 10]
}
random_state : int, default=42
Optional random state, defaults to :math:`42`.
verbose : bool, default=False
Optional indicator of whether to print the analysis, defaults to
``False``.
sep : str, default="___"
Optional field-value separator, defaults to ``"___"``.
Returns
-------
tuple
Results tuple.
""" # noqa : E501
if outcome_column not in data.columns:
error_str = f"Outcome column {outcome_column} not found in DataFrame"
raise ValueError(error_str)
# Separate predictors and outcome
y = data[outcome_column].copy()
X_ini = data.drop(columns=[outcome_column])
if verbose:
print(f"\nInitial shape of X: {X_ini.shape}")
# Encode the binary outcome
label_encoder = LabelEncoder()
y = label_encoder.fit_transform(y)
# Verify that we have a binary outcome
n_classes = len(np.unique(y))
if n_classes != 2:
error_str = """This function is designed for binary classification \
only. More than two classes found."""
raise ValueError(error_str)
# Standardize features
scaler = StandardScaler()
numeric_cols = X_ini.select_dtypes(include=[np.number]).columns
if verbose:
print("Column dtypes:", X_ini.dtypes)
print("Numeric columns found:", len(numeric_cols))
X = X_ini.copy()
if len(numeric_cols) > 0:
X[numeric_cols] = scaler.fit_transform(X_ini[numeric_cols])
else:
if verbose:
print("No numeric columns to standardize")
X = pd.DataFrame(X, columns=X_ini.columns)
if verbose:
outcome_classes = dict(
zip(label_encoder.classes_, range(len(label_encoder.classes_)))
)
print("\nOutcome classes:", outcome_classes)
print(f"\nInitial shape of X: {X.shape}")
# Encode categorical predictors
categorical_cols = X.select_dtypes(include=["object", "category"]).columns
binary_cats = [col for col in categorical_cols if X[col].nunique() == 2]
multi_cats = [col for col in categorical_cols if X[col].nunique() > 2]
for col in binary_cats:
le = LabelEncoder()
X[col] = le.fit_transform(X[col])
# Use dummies only for multi-category
X = pd.get_dummies(X, columns=multi_cats, prefix_sep=sep)
if verbose:
print(f"\nshape of X after one-hot: {X.shape}")
if X.shape[1] > 0:
if verbose:
print("First actual predictor column:", X.columns[0])
else:
error_str = """No predictor columns left after dropping outcome (and \
ID if applicable)."""
raise ValueError(error_str)
# Fit binary logistic regression with elastic net
# For binary classification, multi_class defaults to 'ovr', which yields a
# single set of coefficients.
l1_vec = [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9]
C_vec = np.logspace(-3, 1, 9)
logistic = LogisticRegressionCV(
penalty="elasticnet",
l1_ratios=l1_vec,
solver="saga",
cv=StratifiedKFold(n_splits=3, shuffle=True, random_state=random_state),
random_state=random_state,
max_iter=5000,
class_weight="balanced",
Cs=C_vec,
tol=1e-4,
scoring=metric,
)
# Below is the original way we started
logistic.fit(X, y)
if verbose:
print("Original l1_ratios specified:", l1_vec)
print("Model's l1_ratios attribute:", logistic.l1_ratios)
print("Shape of logistic.scores_[1]:", logistic.scores_[1].shape)
print("\nFirst 10 scores for each row:")
for i in [0, 1]: # explicitly loop through both rows
print(f"\nRow {i} (supposed to be l1_ratio = {l1_vec[i]}):")
print(logistic.scores_[1][0, :10, i])
scores_dt = np.mean(logistic.scores_[1], axis=0)
if verbose:
print("Mean scores for varying l1_ratio and Cs")
print(scores_dt) # print matrix for this fold
scores_df = pd.DataFrame(
scores_dt, # Transpose to get l1_ratios as rows
index=C_vec,
columns=l1_vec,
)
# Label the axes
scores_df.index.name = "C"
scores_df.columns.name = "l1_ratio"
# logistic.coef_ will have shape (1, n_features) for binary classification
coef_df = pd.DataFrame(logistic.coef_, columns=X.columns)
# No indexing by classes since it's binary (one row of coefficients)
# Compute feature importance as absolute value of coefficients
# Since there's only one class row, mean across rows is just that row
feature_importance = np.abs(coef_df.iloc[0, :])
# Select features with non-zero importance
selected_features = feature_importance[(feature_importance > 0.0)].index.tolist()
# Predictions
y_pred = logistic.predict(X)
# Performance metrics
# Find the best C and corresponding CV score
best_c = logistic.C_[0]
c_index = np.where(logistic.Cs_ == best_c)[0][0]
all_class_scores = []
for cl in logistic.scores_:
all_class_scores.extend(logistic.scores_[cl][:, c_index])
best_cv_score = np.mean(all_class_scores)
conf_matrix = confusion_matrix(y, y_pred)
target_names = [str(c) for c in label_encoder.classes_]
X_selected = X[selected_features]
if verbose:
print("\nPerformance Metrics:")
print("-------------------")
print(f"Best C value: {best_c}")
print("\nConfusion Matrix:")
print(conf_matrix)
print("\nClassification Report:")
print(classification_report(y, y_pred, target_names=target_names))
print(f"Best CV score: {best_cv_score}")
# l1_ratio_ returns the best ratio found for each class. For binary,
# there should be one:
print(f"Best l1_ratio: {logistic.l1_ratio_[0]}")
print(f"\nSelected {len(selected_features)} features")
# Print feature importance for selected features
print("\nFeature importance for selected features:")
sorted_features = sorted(
selected_features, key=lambda x: feature_importance[x], reverse=True
)
for feat in sorted_features:
print(f"{feat}: {feature_importance[feat]:.4f}")
print(f"Final shape of selected features: {X_selected.shape}")
# Store metrics in a dictionary
report = classification_report(
y, y_pred, target_names=label_encoder.classes_, output_dict=True
)
metrics = {
"confusion_matrix": conf_matrix,
"classification_report": report,
"accuracy": accuracy_score(y, y_pred),
"cv_scores": all_class_scores,
}
original_features = {}
for feature in selected_features:
if sep in feature:
orig_name = feature.split(sep)[0]
coef = abs(feature_importance[feature])
original_features[orig_name] = max(
original_features.get(orig_name, 0), coef
)
else:
original_features[feature] = abs(feature_importance[feature])
# Create grouped results showing all categories
grp_results = create_grouped_results(selected_features, feature_importance)
results_df = grp_results[0]
sorted_fields = grp_results[1]
categorical_fields = grp_results[2]
if verbose:
# Print the grouped results
print("\nSelected features grouped by main field:")
print(results_df)
# # Save to CSV
# results_df.to_csv('feature_coefficients_grouped.csv', index=False)
# Note: We may still want to keep the original X_selected DataFrame for
# further analysis
X_selected = X[selected_features]
# print(f"Final shape of selected features: {X_selected.shape}")
# After running create_grouped_results, extract main fields
main_fields = []
for field in sorted_fields: # We already have sorted_fields from earlier
if field in categorical_fields:
main_fields.append(field)
else:
main_fields.append(field) # For regular features
# Convert main_fields list to a single-column DataFrame
main_fields_df = pd.DataFrame({"Main Features": main_fields})
top_params = get_parameter_ranking(logistic, n_top=20, threshold=threshold)
if verbose:
print(main_fields_df)
print("\nTop parameter combinations:")
print(top_params)
return (
results_df,
scores_df,
main_fields_df,
top_params,
X_selected,
y,
selected_features,
coef_df,
label_encoder,
feature_importance,
metrics,
)
[docs]
def create_grouped_results(
selected_features: typing.Iterable[str],
feature_importance: dict[str, float],
sep: str = "___",
) -> tuple[pd.DataFrame, typing.Iterable[str], typing.Iterable[str]]:
""":py:class:`tuple` : Creates and returns grouped feature results.
The main dataFrame lists categories under their main fields, with main
fields sorted by their maximum coefficient magnitude.
Parameters
----------
selected_features : typing.Iterable
An iterable of selected feature names.
feature_importance : dict
A dict of feature names and coefficient weights.
sep : str, default="___"
Optional field-value separator, defaults to ``"___"``.
Returns
----------
tuple
Feature results.
""" # noqa : E501
# Step 1: Identify one-hot encoded and regular features
categorical_fields = set()
regular_features = []
for feature in selected_features:
if sep in feature:
# One-hot encoded feature
categorical_fields.add(feature.split(sep)[0])
else:
# Regular feature
regular_features.append(feature)
# Step 2: Organize features by main field
field_groups = {}
# Process categorical fields
for field in categorical_fields:
field_groups[field] = []
# Find all categories for this field
for feature in selected_features:
if feature.startswith(field + sep):
category = feature.split(sep)[1]
coef = feature_importance[feature]
field_groups[field].append(
{"Feature": category, "Coefficient": coef, "AbsCoef": abs(coef)}
)
# Process regular features
for feature in regular_features:
coef = feature_importance[feature]
# For regular features, store coefficient directly
field_groups[feature] = [
{"Feature": "", "Coefficient": coef, "AbsCoef": abs(coef)}
]
# Get max coefficient for each field for sorting
field_max_coef = {}
for field, categories in field_groups.items():
field_max_coef[field] = max(cat["AbsCoef"] for cat in categories)
# Sort fields by their max coefficient
sorted_fields = sorted(
field_groups.keys(), key=lambda f: field_max_coef[f], reverse=True
)
# Create the final DataFrame with the desired structure
results = []
for field in sorted_fields:
if field in categorical_fields:
# For categorical fields, add header row with no coefficient
results.append({"Feature": field, "Coefficient": "..."})
# Add all categories
categories = sorted(
field_groups[field], key=lambda x: x["AbsCoef"], reverse=True
)
for cat in categories:
results.append(
{
# Indent for visual grouping
"Feature": " " + cat["Feature"],
"Coefficient": cat["Coefficient"],
}
)
else:
# For regular features, show coefficient directly
results.append(
{"Feature": field, "Coefficient": field_groups[field][0]["Coefficient"]}
)
return pd.DataFrame(results), sorted_fields, categorical_fields
[docs]
def get_parameter_ranking(
logistic: typing.Any, n_top: int = 10, threshold: float = 1e-3
) -> pandas.DataFrame:
""":py:class:pd.DataFrame : Returns a dataframe of rankings of parameter combinations using stored scores and coefficient paths.
Parameters
----------
logistic : typing.Any
The logistic model.
n_top : int, default=10
Optional number of top ranking features to select, defaults to :math:`10`.
threshold : float, default=1e-3
Optional ranking threshold, defaults to :math:`0.001`.
Returns
-------
pandas.DataFrame
The dataframe of parameter rankings.
""" # noqa : E501
# Create empty list to store parameter information
param_scores = []
# Get model parameters
l1_ratios = logistic.l1_ratios_
Cs = logistic.Cs_
# Get first class key (for binary classification,
# there's only one set of scores)
first_class = list(logistic.scores_.keys())[0]
# Loop through all parameter combinations
for l1_ratio_idx, l1_ratio in enumerate(l1_ratios):
for C_idx, C in enumerate(Cs):
# Get mean score across folds for this parameter combination
score = logistic.scores_[first_class][:, C_idx, l1_ratio_idx].mean()
# Get coefficients for this parameter combination
coef0 = logistic.coefs_paths_[first_class][0, C_idx, l1_ratio_idx, :]
coef1 = logistic.coefs_paths_[first_class][1, C_idx, l1_ratio_idx, :]
coef2 = logistic.coefs_paths_[first_class][2, C_idx, l1_ratio_idx, :]
# Count non-zero coefficients
n_features0 = np.sum(np.abs(coef0) > threshold)
n_features1 = np.sum(np.abs(coef1) > threshold)
n_features2 = np.sum(np.abs(coef2) > threshold)
# Append to our list
param_scores.append(
{
"l1_ratio": l1_ratio,
"C": C,
"score": score,
"n_features0": n_features0,
"n_features1": n_features1,
"n_features2": n_features2,
}
)
# Convert to DataFrame and sort
params_df = pd.DataFrame(param_scores)
params_df = (
params_df.sort_values("score", ascending=False)
.drop_duplicates("n_features0")
.head(n_top)
)
return params_df