from __future__ import annotations
__all__ = [
"add_answer_dict",
"add_onehot_variables",
"combine_unlisted_variables",
"convert_dictionary_field_type",
"convert_onehot_to_binary",
"get_branching_logic_variables",
"get_data_dictionary",
"get_df_forms",
"get_df_map",
"get_events_and_forms_info",
"get_form_event",
"get_label",
"get_labels",
"get_missing_data_codes",
"get_records",
"get_redcap_data",
"get_section_prefix",
"get_value",
"get_values",
"harmonise_age",
"homogenise_variables",
"initial_data_processing",
"is_unlisted_item",
"is_yesno",
"is_yesno_question",
"list_categorical_onehot_columns",
"list_checkbox_onehot_columns",
"load_countries_table",
"load_units_conversion_table",
"map_variable",
"rename_checkbox_variables",
"replace_with_nan_for_missing_code_checkbox",
"resolve_checkbox_branching_logic",
"user_assigned_to_dag",
]
# -- IMPORTS --
# -- Standard libraries --
import io
import time
import typing
import warnings
from pathlib import Path
# -- 3rd party libraries --
import numpy as np
import pandas
import requests
# -- Internal libraries --
from isaricanalytics.logging.logger import setup_logger
logger = setup_logger(__name__)
pd = pandas # An alias to allow Pandas code refs to work independently
# of Pandas Intersphinx refs in type hinting and docstrings
############################################
# API-calling functions
############################################
[docs]
def user_assigned_to_dag(redcap_url: str, redcap_api_key: str) -> bool:
""":py:class:`bool` : Whether the user is assigned to a data access group (DAG).
Parameters
----------
redcap_url : str
REDCap URL.
redcap_api_key : str
REDCap API key.
Returns
-------
bool
Whether the user is assigned to a REDCap DAG.
"""
conex = {
"token": redcap_api_key,
"content": "dag",
"format": "csv",
"returnFormat": "json",
}
response = requests.post(redcap_url, data=conex)
return response.status_code == 403
[docs]
def get_records(
redcap_url: str,
redcap_api_key: str,
data_access_groups: typing.Iterable[str] | None = None,
user_assigned_to_dag: bool = False,
) -> pandas.DataFrame:
""":py:class:`pandas.DataFrame` : Returns a dataframe of records from the REDCap API.
Parameters
----------
redcap_url : str
REDCap URL.
redcap_api_key : str
REDCap API key.
data_access_groups : typing.Iterable, default=None
An iterable of data access group names.
user_assigned_to_dag : bool, default=False
Whether the user is assigned to a data access group (DAG).
Returns
-------
pandas.DataFrame
Records from the REDCap API data.
""" # noqa : E501
started = time.perf_counter()
if (data_access_groups is None) or (user_assigned_to_dag is False):
logger.info("REDCap records export: requesting all records")
conex = {
"token": redcap_api_key,
"content": "record",
"action": "export",
"format": "csv",
"type": "flat",
"csvDelimiter": "",
"rawOrLabel": "label",
"rawOrLabelHeaders": "raw",
"exportCheckboxLabel": "false",
"exportSurveyFields": "false",
"exportDataAccessGroups": "true",
"returnFormat": "json",
}
response = requests.post(redcap_url, data=conex)
logger.debug("HTTP Status: " + str(response.status_code))
data = pd.read_csv(
io.StringIO(response.text), dtype={"subjid": "str"}, keep_default_na=False
)
if data_access_groups is not None:
ind = data["redcap_data_access_group"].isin(data_access_groups)
data = data.loc[ind].reset_index(drop=True)
else:
logger.info(
"REDCap records export: requesting DAG-scoped records for "
f"{len(data_access_groups)} DAG(s)"
)
df_list = []
for dag in data_access_groups:
unique_group = dag.replace("-", "").replace(" ", "_").lower()[:18]
conex = {
"token": redcap_api_key,
"content": "dag",
"action": "switch",
"dag": unique_group,
"returnFormat": "json",
}
response = requests.post(redcap_url, data=conex)
if response.text != "1":
logger.warning(
f"Data access group ID: {dag}. Warning: Could not"
f"switch DAG to unique group name: {unique_group}"
)
continue
conex = {
"token": redcap_api_key,
"content": "record",
"action": "export",
"format": "csv",
"type": "flat",
"csvDelimiter": "",
"rawOrLabel": "label",
"rawOrLabelHeaders": "raw",
"exportCheckboxLabel": "false",
"exportSurveyFields": "false",
"exportDataAccessGroups": "false",
"returnFormat": "json",
}
try:
response = requests.post(redcap_url, data=conex)
df_new = pd.read_csv(
io.StringIO(response.text),
dtype={"subjid": "str"},
keep_default_na=False,
)
df_new["redcap_data_access_group"] = dag
df_list.append(df_new)
logger.debug(
f"Data access group ID: {dag}, HTTP Status: {response.status_code}"
)
except pd.errors.EmptyDataError:
logger.warning(
f"Data access group ID: {dag}, "
f"HTTP Status: {response.status_code}. "
"Warning: Could not retrieve data from unique group "
f"name: {unique_group}"
)
continue
if len(df_list) > 0:
data = pd.concat(df_list, axis=0)
else:
data = None
elapsed = time.perf_counter() - started
row_count = 0 if data is None else len(data)
logger.info(
f"REDCap records export complete in {elapsed:.1f}s " f"(rows={row_count})"
)
return data
[docs]
def get_data_dictionary(redcap_url: str, redcap_api_key: str) -> pandas.DataFrame:
""":py:class:`pandas.DataFrame` : Returns a data dictionary from the REDCap API.
Parameters
----------
redcap_url : str
REDCap URL.
redcap_api_key : str
REDCap API key.
Returns
-------
pandas.DataFrame
Data dictionary from the REDCap API.
"""
conex = {
"token": redcap_api_key,
"content": "metadata",
"format": "csv",
"returnFormat": "json",
}
# Make the API request
response = requests.post(redcap_url, data=conex)
return pd.read_csv(io.StringIO(response.text), keep_default_na=False)
[docs]
def get_missing_data_codes(redcap_url: str, redcap_api_key: str) -> dict[str, str]:
""":py:class:`dict` : Returns missing data codes from the REDCAP API, using the project metadata.
Parameters
----------
redcap_url : str
REDCap URL.
redcap_api_key : str
REDCap API key.
Returns
-------
dict
A dict of missing data codes from the REDCap API, using the project
metadata. An empty dict is returned in the case there are no missing
data codes.
""" # noqa: E501
conex = {
"token": redcap_api_key,
"content": "project",
"format": "csv",
"returnFormat": "json",
}
response = requests.post(redcap_url, data=conex)
data = pd.read_csv(io.StringIO(response.text), keep_default_na=False)
if data["missing_data_codes"].isna().all():
return dict()
else:
missing_data_codes = data["missing_data_codes"].values[0]
return dict(
zip(
[x.split(",")[1].strip() for x in missing_data_codes.split("|")],
[x.split(",")[0].strip() for x in missing_data_codes.split("|")],
)
)
##########################################################
# Functions related to the data dictionary
##########################################################
[docs]
def get_values(x: typing.Iterable[str]) -> list[str]:
""":py:class:`list` : Returns a list of values.
Parameters
----------
x : typing.Iterable
An iterable of value tuples.
Returns
-------
list
A list of values.
"""
return [y.split(",")[0] for y in x]
[docs]
def get_value(x: typing.Iterable[str]) -> list[str]:
""":py:class:`list` : Returns a list of values.
.. warning::
DEPRECATED.
Parameters
----------
x : typing.Iterable
An iterable of value tuples.
Returns
-------
list
A list of values.
"""
warnings.warn(
(
"`redcap_data.get_value` is deprecated; "
"use `redcap_data.get_values` instead."
),
DeprecationWarning,
stacklevel=2,
)
return get_values(x)
[docs]
def get_labels(x: typing.Iterable[str]) -> list[str]:
""":py:class:`list` : Returns a list of labels.
Parameters
----------
x : typing.Iterable
An iterable of label tuples.
Returns
-------
list
A list of labels.
"""
return [",".join(y.split(",")[1:]).strip() for y in x]
[docs]
def get_label(x: typing.Iterable[str]) -> list[str]:
""":py:class:`list` : Returns a list of labels.
.. warning::
DEPRECATED.
Parameters
----------
x : typing.Iterable
An iterable of label tuples.
Returns
-------
list
A list of labels.
"""
warnings.warn(
(
"`redcap_data.get_label` is deprecated; "
"use `redcap_data.get_labels` instead."
),
DeprecationWarning,
stacklevel=2,
)
return get_labels(x)
[docs]
def add_answer_dict(dictionary: pandas.DataFrame) -> pandas.DataFrame:
""":py:class:`pandas.DataFrame` : Returns the REDCap schema data dictionary with a lookup dict of labels and values.
By default, ignores Yes/No/Unknown radio variables.
Parameters
----------
dictionary : pandas.DataFrame
REDCap schema data dictionary.
Returns
-------
pandas.DataFrame
An updated REDCap schema data dictionary with a lookup dict of labels
and values.
""" # noqa: E501
new_dictionary = dictionary.copy()
# Get categories from dictionary
answers = new_dictionary["select_choices_or_calculations"].copy()
# This may throw an error if there are variables of type: slider or calc
no_answers_ind = answers.fillna("").apply(
lambda x: (len(x) > 0) & (x.count("|") == 0) & (x.count(",") == 0)
)
yes_no_unknown_ind = answers.fillna("").apply(is_yesno)
answers.loc[(no_answers_ind | yes_no_unknown_ind)] = np.nan
answers = answers.str.rstrip("|,").str.split(r"\|").fillna("")
answers = answers.apply(lambda x: [y.strip() for y in x])
# This fixes the missing answers ind
answers = answers.apply(lambda x: [y for y in x if y != ""])
# answers = answers.apply(
# get_answer_dict, missing_data_codes=missing_data_codes)
answers = answers.apply(lambda x: dict(zip(get_label(x), get_value(x))))
answers.name = "answer_dict"
new_dictionary = pd.concat([new_dictionary, answers], axis=1)
return new_dictionary
[docs]
def list_categorical_onehot_columns(
dictionary_row: dict[str, typing.Any], data: pandas.DataFrame, sep: str = "___"
) -> list[str]:
""":py:class:`list` Returns a list of categorical onehot-encoded columns in the given dataframe.
Parameters
----------
dictionary_row : dict
A row of the data dictionary.
data : pandas.DataFrame
The incoming data.
sep : str, default="___"
Separator of field/variable name and value in the list.
Returns
-------
list
A list of categorical onehot-encoded columns in the given dataframe.
""" # noqa: E501
variable = dictionary_row["field_name"]
answers = dictionary_row["answer_dict"].keys()
return [variable + sep + y for y in answers if y in data[variable].values]
[docs]
def list_checkbox_onehot_columns(
dictionary_row: dict[str, typing.Any], data: pandas.DataFrame, sep: str = "___"
) -> list[str]:
""":py:class:`list` Returns a list of checkbox onehot-encoded columns in the given dataframe.
Parameters
----------
dictionary_row : dict
A row of the data dictionary.
data : pandas.DataFrame
The incoming data.
sep : str, default="___"
Optional separator of field/variable name and value in the list.
Returns
-------
list
A list of checkbox onehot-encoded columns in the given dataframe.
""" # noqa: E501
variable = dictionary_row["field_name"]
answers = dictionary_row["answer_dict"].keys()
columns = [variable + sep + x for x in answers]
return [col for col in columns if col in data.columns]
[docs]
def get_section_prefix(x: str) -> str:
""":py:class:`str` : Returns the section prefix.
Parameters
----------
x : str
Section name/value.
Returns
-------
str
The section prefix.
"""
return x.split("_data")[-1] if x.startswith("daily") else x.split("_")[0]
[docs]
def add_onehot_variables(
data: pandas.DataFrame, dictionary: pandas.DataFrame, sep: str = "___"
) -> pandas.DataFrame:
""":py:class:`pandas.DataFrame` : Returns the data dictionary with rows for onehot-encoded categorical variables.
Add new rows to the dictionary for onehot-encoded categorical variables,
using only the answers that exist within the data, e.g. if checkbox columns
exist (after removing columns with only 'Unchecked') or if radio column
answers are present for at least one subjid.
Parameters
----------
data : pandas.DataFrame
The incoming data.
dictionary : pandas.DataFrame
The data dictionary.
sep : str, default="___"
Optional separator of field/variable names and values.
Returns
-------
pandas.DataFrame
The data dictionary with rows for onehot-encoded categorical variables.
""" # noqa: E501
new_dictionary = dictionary.copy()
new_dictionary["parent"] = ""
ind = new_dictionary["field_name"].str.contains("_")
new_dictionary.loc[ind, "parent"] = new_dictionary.loc[ind, "field_name"].apply(
lambda x: x.split("_")[0]
)
ind = new_dictionary["answer_dict"].apply(len) > 0
categorical_ind = new_dictionary["field_type"].isin(["radio", "dropdown"])
columns = ["field_name", "answer_dict"]
new_variables = new_dictionary.loc[(ind & categorical_ind)].copy()
new_variables.loc[:, "field_name"] = new_variables[columns].apply(
list_categorical_onehot_columns, data=data, sep=sep, axis=1
)
checkbox_ind = new_dictionary["field_type"] == "checkbox"
# Retain items in answer dict only if they match
add_new_variables = new_dictionary.loc[checkbox_ind].copy()
add_new_variables.loc[:, "field_name"] = new_dictionary[columns].apply(
list_checkbox_onehot_columns, data=data, sep=sep, axis=1
)
new_variables = pd.concat([new_variables, add_new_variables], axis=0)
# Add these onehot variables directly beneath the original categorical
# variables in the dictionary
new_variables = new_variables.reset_index()
n_variables = new_variables["field_name"].apply(len)
variable_list = sum(new_variables["field_name"].tolist(), [])
new_variables = new_variables.loc[np.repeat(n_variables.index, n_variables)]
new_variables.loc[:, "field_name"] = variable_list
new_variables["index"] += np.hstack([np.linspace(0.1, 0.9, n) for n in n_variables])
new_variables = new_variables.set_index("index")
new_variables.index.name = None
# Discard information about section header and choices
empty_columns = ["section_header", "select_choices_or_calculations"]
new_variables.loc[:, empty_columns] = ""
new_variables.loc[:, "text_validation_type_or_show_slider_number"] = ""
new_variables.loc[:, "field_type"] = "binary"
new_variables.loc[:, "field_label"] = new_variables["field_name"].apply(
lambda x: x.split("___")[-1]
)
new_variables.loc[:, "parent"] = new_variables["field_name"].apply(
lambda x: x.split("___")[0]
)
new_dictionary = pd.concat([new_dictionary, new_variables], axis=0)
new_dictionary = new_dictionary.sort_index().reset_index(drop=True)
# Can drop answer_dict column now
new_dictionary.drop(columns="answer_dict", inplace=True)
# Add section headers as new rows in the data dictionary
ind = new_dictionary["section_header"] != ""
ind = ind.loc[ind].index
sections = pd.DataFrame("", columns=new_dictionary.columns, index=ind)
sections["field_label"] = new_dictionary.loc[ind, "section_header"].apply(
lambda x: x.split(": ")[0]
)
sections["field_type"] = "section"
sections["form_name"] = new_dictionary.loc[ind, "form_name"]
sections["field_name"] = new_dictionary.loc[ind, "field_name"].apply(
get_section_prefix
)
sections.index -= 0.5
new_dictionary = pd.concat([new_dictionary, sections], axis=0)
new_dictionary = new_dictionary.sort_index().reset_index(drop=True)
return new_dictionary
[docs]
def is_yesno_question(x: str) -> str:
""":py:class:`str` : Returns a cleaned version of a Yes/No/Unknown question string.
Checks if the string is a Yes/No/Unknown question, and removes spaces in
case there are variations in the same string.
Parameters
----------
x : str
A question string.
Returns
-------
str
A cleaned version of the string if it is a Yes/No/Unknown question.
Otherwise the original is returned.
"""
return x.replace(" ", "") in ("1,Yes|0,No|99,Unknown", "1,Yes|0,No")
[docs]
def is_yesno(x: str) -> str:
""":py:class:`str` : Returns a cleaned version of a Yes/No/Unknown question string.
.. warning::
DEPRECATED.
Checks if the string is a Yes/No/Unknown question, and removes spaces in
case there are variations in the same string.
Parameters
----------
x : str
A question string.
Returns
-------
str
A cleaned version of the string if it is a Yes/No/Unknown question.
Otherwise the original is returned.
"""
warnings.warn(
(
"`redcap_data.is_yesno` is deprecated; "
"use `redcap_data.is_yesno_question` instead."
),
DeprecationWarning,
stacklevel=2,
)
return is_yesno_question(x)
[docs]
def convert_dictionary_field_type(dictionary: pandas.DataFrame) -> pandas.DataFrame:
""":py:class:`pandas.DataFrame` : Return a dictionary of variable types, based on REDCAP structure.
Parameters
----------
dictionary : pandas.DataFrame
The REDCap data dictionary.
Returns
-------
pandas.DataFrame
a dictionary of variable types, based on REDCAP structure.
""" # noqa : E501
new_dictionary = dictionary.copy()
val_column = "text_validation_type_or_show_slider_number"
units_ind = new_dictionary["field_name"].str.endswith("_units")
new_dictionary.loc[units_ind, "field_type"] = "units"
binary_ind = new_dictionary["field_type"].isin(
["radio", "dropdown"]
) & new_dictionary["select_choices_or_calculations"].apply(is_yesno)
# Or truefalse/yesno types
binary_ind |= new_dictionary["field_type"].isin(["truefalse", "yesno"])
new_dictionary.loc[binary_ind, "field_type"] = "binary"
# Discard answer options if they exist (if a Yes/No/Unknown radio)
new_dictionary.loc[binary_ind, "select_choices_or_calculations"] = ""
date_ind = new_dictionary[val_column].isin(["date_dmy", "datetime_dmy"])
new_dictionary.loc[date_ind, "field_type"] = "date"
numeric_ind = new_dictionary[val_column].isin(
["number", "integer"]
) | new_dictionary["field_type"].isin(["slider"])
new_dictionary.loc[numeric_ind, "field_type"] = "numeric"
freetext_ind = new_dictionary["field_type"].isin(["text", "notes", "descriptive"])
new_dictionary.loc[freetext_ind, "field_type"] = "freetext"
categorical_ind = new_dictionary["field_type"].isin(["radio", "dropdown"])
new_dictionary.loc[categorical_ind, "field_type"] = "categorical"
return new_dictionary
[docs]
def replace_with_nan_for_missing_code_checkbox(
data: pandas.DataFrame, missing_data_codes: dict[str, typing.Any]
) -> pandas.DataFrame:
""":py:class:`pandas.DataFrame` : Return the input dataframe with missing code checkbox values converted to NaN.
Parameters
----------
data : pandas.DataFrame
The incoming data.
missing_data_codes : dict
A dict of missing code keys and values.
Returns
-------
pandas.DataFrame
The input dataframe with missing code checkbox values converted to NaN.
""" # noqa : E501
missing_data_values = [x.lower() for x in missing_data_codes.values()]
missing_columns = [
col for col in data.columns if col.split("___")[-1] in missing_data_values
]
nan_mask = (data[missing_columns] == "Checked").T.reset_index()
nan_mask["index"] = nan_mask["index"].apply(lambda x: x.split("___")[0])
nan_mask = nan_mask.groupby("index").any()
columns = [col for col in data.columns if col.split("___")[0] in nan_mask.index]
nan_mask = nan_mask.loc[[col.split("___")[0] for col in columns]]
nan_mask["column"] = columns
nan_mask = nan_mask.set_index("column").T
data[nan_mask] = np.nan
return data
############################################
# Data transformations
############################################
[docs]
def is_unlisted_item(x: typing.Iterable[str]) -> str:
""":py:class:`str` :
Parameters
---------
x : typing.Iterable
An iterable of strings.
Returns
-------
str
-
"""
return "".join([y for y in x if y.isdigit() is False]).endswith("unlisted_item")
[docs]
def combine_unlisted_variables(
data: pandas.DataFrame, dictionary: pandas.DataFrame, sep: str = "___"
) -> tuple[pandas.DataFrame]:
""":py:class:`tuple` : Combine variables in repetitions of a question.
Combine variables that exist in repeated versions of the same question
(e.g. additional dropdown questions asked after Yes/No/Unknown questions
for established variables).
Parameters
----------
data : pandas.DataFrame
The incoming data.
dictionary : pandas.DataFrame
The REDCap data dictionary.
sep : str, default="___"
Optional value separator.
Returns
-------
tuple
The update data and data dictionary.
"""
unlisted_ind = dictionary["field_name"].str.endswith("unlisted")
unlisted_columns = dictionary.loc[unlisted_ind, "field_name"]
unlisted_item_ind = dictionary["field_name"].apply(is_unlisted_item)
unlisted_item_columns = dictionary.loc[unlisted_item_ind, "field_name"]
unlisted_item_columns = [
col for col in unlisted_item_columns if col in data.columns
]
unlisted_columns_dict = {
k: [v for v in unlisted_item_columns if k in v] for k in unlisted_columns
}
new_dictionary_list = []
for ind in unlisted_columns.index:
column = dictionary.loc[ind, "field_name"]
values = data[unlisted_columns_dict[column]].stack().unique()
values = [val for val in values if val not in (np.nan, "", "Other")]
new_data = pd.DataFrame(
index=data.index, columns=[column + "_item" + sep + x for x in values]
)
for value in values: # it's too slow...
yes_ind = (data[unlisted_columns_dict[column]] == value).any(axis=1)
new_data.loc[yes_ind, column + "_item" + sep + value] = True
column_loc = data.columns.get_loc(column)
data = pd.concat(
[data.iloc[:, :column_loc], new_data, data.iloc[:, column_loc:]], axis=1
)
new_dictionary_index = ind + np.linspace(0.1, 0.9, len(values))
new_dictionary = pd.DataFrame(
"", columns=dictionary.columns, index=new_dictionary_index
)
new_dictionary["field_type"] = "binary"
new_dictionary["field_name"] = [
column + "_item" + sep + value for value in values
]
new_dictionary["field_label"] = values
new_dictionary["parent"] = column
new_dictionary["form_name"] = dictionary.loc[ind, "form_name"]
new_dictionary["branching_logic"] = dictionary.loc[ind, "branching_logic"]
new_dictionary_list.append(new_dictionary)
dictionary = pd.concat([dictionary] + new_dictionary_list, axis=0)
dictionary = dictionary.sort_index().reset_index(drop=True)
return data, dictionary
[docs]
def rename_checkbox_variables(
data: pandas.DataFrame, dictionary: pandas.DataFrame
) -> pandas.DataFrame:
""":py:class:`pandas.DataFrame` : Rename checkbox variable columns.
By default the suffix is their answer option value. Convert this answer
option value to the answer option name.
Parameters
----------
data : pandas.DataFrame
The incoming data.
dictionary : pandas.DataFrame
The REDCap data dictionary.
Returns
-------
pandas.DataFrame
The updated data.
"""
checkbox_ind = dictionary["field_type"] == "checkbox"
answer_dict = dictionary.loc[checkbox_ind, "answer_dict"]
n_answers = answer_dict.apply(len)
values = sum([list(x.values()) for x in answer_dict], [])
labels = sum([list(x.keys()) for x in answer_dict], [])
names = list(np.repeat(dictionary.loc[checkbox_ind, "field_name"], n_answers))
# Need lower case because REDCap automatically applies this to missing
# value codes, if they exist
name_values = [x + "___" + y.lower() for x, y in zip(names, values)]
name_labels = [x + "___" + y for x, y in zip(names, labels)]
data.rename(columns=dict(zip(name_values, name_labels)), inplace=True)
return data
[docs]
def get_branching_logic_variables(branching_logic: str) -> list[str]:
""":py:class:`list` : Return all variables included in the branching logic (including checkboxes variables).
Parameters
----------
branching_logic : str
THe branching logic string.
Returns
-------
list
The list of all variables included in the branching logic.
""" # noqa : E501
return [
x.split("]")[0].replace("(", "___").replace(")", "")
for x in branching_logic.split("[")[1:]
]
[docs]
def resolve_checkbox_branching_logic(
data: pd.DataFrame, dictionary: pd.DataFrame
) -> pd.DataFrame:
""":py:class:`pandas.DataFrame` : Resolves checkbox logic.
By default, a cell is marked as 'Unchecked' in the absence of the
positive, even if the question was not asked to the subjid. If the question
was not asked to the subjid because of the branching logic, then set this
to be NaN instead. This does not completely check the branching logic,
which is a data quality issue!.
Parameters
----------
data : pandas.DataFrame
The incoming data.
dictionary : pandas.DataFrame
The REDCap data dictionary.
Returns
-------
pandas.DataFrame
The data with the checkbox branching logic resolved.
"""
checkbox_ind = dictionary.loc[(dictionary["field_type"] == "checkbox")]
branching_logic_variables = dictionary["branching_logic"].apply(
get_branching_logic_variables
)
for ind in checkbox_ind.index:
branching_logic_columns = [
col for col in data.columns if col in branching_logic_variables.loc[ind]
]
remove_ind = data[branching_logic_columns].isna().any(axis=1)
checkbox_columns = [
col
for col in data.columns
if (col.split("___")[0] == dictionary.loc[ind, "field_name"])
]
data.loc[remove_ind, checkbox_columns] = np.nan
return data
[docs]
def harmonise_age(
data: pandas.DataFrame,
age_columns: typing.Iterable[str] = ["demog_age", "demog_age_units"],
) -> pd.DataFrame:
""":py:class:`pandas.DataFrame` : The data with ages harmonised.
.. warning::
DEPRECATED. Age should now be included in `conversion_table.csv`.
Convert age from any units into age in years.
Parameters
----------
data : pandas.DataFrame
The incoming data.
age_columns : typing.Iterable, default=["demog_age", "demog_age_units"]
An iterable (e.g. list) of age columns.
Returns
-------
pandas.DataFrame
The data with ages harmonised.
"""
warnings.warn(
(
"`redcap_data.harmonise_age` is deprecated. "
"Age should now be included in `conversion_table.csv`. "
"Convert age from any units into age in years."
),
DeprecationWarning,
stacklevel=2,
)
data = data.rename(columns=dict(zip(age_columns, ["demog_age", "demog_age_units"])))
data.loc[:, "demog_age"] = pd.to_numeric(data["demog_age"], errors="coerce")
data.loc[:, "demog_age"] = data["demog_age"].astype(float)
data.loc[(data["demog_age_units"] == "Months"), "demog_age"] *= 1 / 12
data.loc[(data["demog_age_units"] == "Days"), "demog_age"] *= 1 / 365
unit_list = ["Days", "Months", "Years"]
# Standardize the units to 'Years'
data.loc[data["demog_age_units"].isin(unit_list), "demog_age_units"] = "Years"
return data
[docs]
def map_variable(
variable: pandas.Series,
mapping_dict: dict[str, typing.Any],
non_nan_value: str = "Other / Unknown",
) -> pandas.Series:
""":py:class:`pandas.Series` : Map a variable column using a dict.
Any non-NaN value not in the dict keys is converted to the value specified
by ``other_value_str``.
Parameters
----------
variable : pandas.Series
The variable column to map.
mapping_dict : dict
The mapping dict.
non_nan_value : str, default="Other / Unknown"
Optional value with which to replace non-NaN values not in the dict
keys.
"""
non_nan_value_ind = (variable.isin(mapping_dict.keys()) == 0) & variable.notna()
variable = variable.map(mapping_dict)
variable.loc[non_nan_value_ind] = non_nan_value
return variable
[docs]
def load_units_conversion_table() -> pandas.DataFrame:
""":py:class:`pandas.DataFrame` : Loads the conversion table from a CSV.
Returns
-------
pandas.DataFrame
The conversion table.
"""
try:
# Lookup table in an `assets` subfolder adjacent to the current folder
return pd.read_csv(
Path(__file__).parent.parent.joinpath("assets", "conversion_table.csv")
)
except FileNotFoundError:
# Otherwise just get it from VERTEX assets on GitHub
return pd.read_csv(
"https://raw.githubusercontent.com/ISARICResearch/"
"VERTEX/refs/heads/main/assets/conversion_table.csv"
)
[docs]
def load_countries_table(encoding: str = "latin-1") -> pandas.DataFrame:
""":py:class:`pandas.DataFrame` : Loads countries from a CSV.
Parameters
----------
encoding : str, default="latin-1"
Optional file encoding.
Returns
-------
pandas.DataFrame
The countries table.
"""
try:
# Lookup table in an `assets` subfolder adjacent to the current
# folder.
return pd.read_csv(
Path(__file__).parent.parent.joinpath("assets", "countries.csv"),
encoding=encoding,
)
except FileNotFoundError:
# Otherwise just get it from VERTEX assets on GitHub
return pd.read_csv(
"https://raw.githubusercontent.com/ISARICResearch/"
"VERTEX/refs/heads/main/assets/countries.csv",
encoding=encoding,
)
[docs]
def homogenise_variables(
data: pandas.DataFrame, dictionary: pandas.DataFrame
) -> tuple[pd.DataFrame]:
""":py:class:`pandas.DataFrame` : Converts variables in given units in the data based on a conversion table.
Parameters
----------
data : pandas.DataFrame
The incoming data.
dictionary : pandas.DataFrame
Conversion table/dictionary, as a Pandas dataframe.
Returns
-------
pandas.DataFrame
The data with unit conversions applied.
""" # noqa : E501
conversion_table = load_units_conversion_table()
for index, row in conversion_table.iterrows():
from_unit = row["from_unit"]
to_unit = row["to_unit"]
value_col = row["variable"]
unit_col = row["variable_unit"]
conversion_factor = row["conversion_factor"]
try:
# Ensure that the value column is numeric
data.loc[:, value_col] = pd.to_numeric(data[value_col], errors="coerce")
# Check if the variable is labs_lymphocyte or labs_neutrophil
check_ind = (
value_col in ["labs_lymphocyte", "labs_neutrophil"]
and from_unit == "10^9/L"
and to_unit == "%"
)
if check_ind:
# Convert absolute count to percentage using total WBC count
total_wbc_col = "labs_wbccount"
if total_wbc_col in data.columns:
# Ensure the total WBC count column is numeric
data.loc[:, total_wbc_col] = pd.to_numeric(
data[total_wbc_col], errors="coerce"
)
# Apply conversion only to non-empty values
mask = (
(data[unit_col] == from_unit)
& data[value_col].notna()
& data[total_wbc_col].notna()
)
data.loc[mask, value_col] = 100 * (
data.loc[mask, value_col] / data.loc[mask, total_wbc_col]
)
data.loc[mask, unit_col] = to_unit
continue
# Only apply the conversion if the factor is not NaN and the
# value_col is not empty
if not pd.isna(conversion_factor):
mask = (data[unit_col] == from_unit) & data[value_col].notna()
# Apply the conversion
data.loc[mask, value_col] *= conversion_factor
# Set all units to the target unit
data.loc[data[unit_col] == from_unit, unit_col] = to_unit
dictionary_ind = dictionary["field_name"] == value_col
dictionary.loc[dictionary_ind, "field_label"] += f" ({to_unit})"
except Exception:
pass
if "demog_age" not in conversion_table["variable"]:
try:
data = harmonise_age(data)
except Exception:
pass
return data, dictionary
[docs]
def convert_onehot_to_binary(
data: pandas.DataFrame, dictionary: pandas.DataFrame
) -> pandas.DataFrame:
""":py:class:`pandas.DataFrame` : Converts onehot-encoded columns in the data.
The conversions will be True/False/NaN values, and answers from the data
dictionary discarded if they exist.
Parameters
----------
data : pandas.DataFrame
The incoming data.
dictionary : pandas.DataFrame
The REDCap data dictionary.
Returns
-------
pandas.DataFrame
The data with the one-hot columns appropriately converted.
"""
binary_ind = dictionary["field_type"] == "binary"
binary_columns = dictionary.loc[binary_ind, "field_name"].tolist()
binary_columns = [col for col in binary_columns if col in data.columns]
mapping_dict = {
"Yes": True,
"Checked": True,
"No": False,
"Unchecked": False,
"Unknown": np.nan,
}
with pd.option_context("future.no_silent_downcasting", True):
data.loc[:, binary_columns] = data[binary_columns].replace(mapping_dict)
return data
############################################
# Initial data processing
############################################
[docs]
def initial_data_processing(
data: pandas.DataFrame,
dictionary: pandas.DataFrame,
missing_data_codes: dict[str, typing.Any],
) -> tuple[pandas.DataFrame]:
""":py:class:`tuple` : Initial processing function invoked after the REDCap API call.
Parameters
----------
data : pandas.DataFrame
The incoming REDCap data.
dictionary : pandas.DataFrame
The REDCap data dictionary.
missing_data_codes : dict
The dict of missing code keys and values.
Returns
-------
tuple
A tuple consisting of the updated data and data dictionary dataframes.
""" # noqa : E501
# Replace empty cells or 'Unknown' with NaN
with pd.option_context("future.no_silent_downcasting", True):
data = data.replace(["", "Unknown", "unknown"], np.nan)
# Replace missing data codes with NaN
if missing_data_codes is not None:
with pd.option_context("future.no_silent_downcasting", True):
data = data.replace(list(missing_data_codes.keys()), np.nan)
# Replace values in checkbox variables with NaN if the checkbox missing
# data code column is 'Checked'
data = replace_with_nan_for_missing_code_checkbox(data, missing_data_codes)
# Remove columns where all the data is a negative answer option
# (or missing answer)
remove_values = ["", "no", "never smoked", "unchecked", "nan"]
remove_values += [x.lower() for x in missing_data_codes.keys()]
remove_columns = data.columns[
data.astype(str).map(lambda x: (x.lower().strip() in remove_values)).all(axis=0)
]
data = data[[col for col in data.columns if col not in remove_columns]]
# Convert 'Unchecked' to NaN when a checkbox question wasn't asked
# to a subjid because of their previous answers (i.e. the branching logic)
# TODO: this needs updating based on branching logic values,
# not just the variables themselves
data = resolve_checkbox_branching_logic(data, dictionary)
# Add a python dict of choice options to the dictionary
new_dictionary = dictionary.copy()
# Remove rows corresponding to the deleted columns of the data (ignore
# checkbox columns here)
remove_variables = [
x
for x in remove_columns.map(lambda x: x.split("___")[0])
if x not in data.columns.map(lambda x: x.split("___")[0])
]
new_dictionary["section_header"] = new_dictionary["section_header"].replace(
"", np.nan
)
new_dictionary["section_header"] = new_dictionary["section_header"].ffill()
new_dictionary = new_dictionary.loc[
(new_dictionary["field_name"].isin(remove_variables) == 0)
]
new_dictionary["section_header"] = new_dictionary["section_header"].mask(
new_dictionary["section_header"].duplicated()
)
new_dictionary["section_header"] = new_dictionary["section_header"].fillna("")
new_dictionary = new_dictionary.reset_index(drop=True)
new_dictionary = add_answer_dict(new_dictionary)
# Rename checkbox variables
data = rename_checkbox_variables(data, new_dictionary)
# Convert the REDCap field types and add new onehot-encoded categorical
# variables to the dictionary (as they will be onehot-encoded in
# descriptive analysis), without onehot-encoding these yet (because this
# may affect imputation etc.)
new_dictionary = add_onehot_variables(data, new_dictionary)
new_dictionary = convert_dictionary_field_type(new_dictionary)
columns = [
"field_name",
"form_name",
"field_type",
"field_label",
"parent",
"branching_logic",
]
new_dictionary = new_dictionary[columns]
# Convert Yes(Checked)/No(Unchecked)/Unknown to True/False/NaN
data = convert_onehot_to_binary(data, new_dictionary)
# Convert numerical data to numeric type and homogenise if mixed units
numeric_ind = new_dictionary["field_type"] == "numeric"
numeric_columns = new_dictionary.loc[numeric_ind, "field_name"].tolist()
data[numeric_columns] = data[numeric_columns].apply(pd.to_numeric, errors="coerce")
data, dictionary = homogenise_variables(data, dictionary)
# Convert columns with dates into datetime
date_ind = new_dictionary["field_type"] == "date"
date_columns = new_dictionary.loc[date_ind, "field_name"].tolist()
data[date_columns] = data[date_columns].apply(pd.to_datetime, errors="coerce")
data, new_dictionary = combine_unlisted_variables(data, new_dictionary)
return data, new_dictionary
[docs]
def get_df_map(
data: pandas.DataFrame, dictionary: pandas.DataFrame
) -> tuple[pandas.DataFrame | dict[str, typing.Any]]:
""":py:class:`pandas.DataFrame` : Returns a dataframe with single-event rows converted to a format with one row per patient.
Parameters
----------
data : pandas.DataFrame
The incoming REDCap data.
dictionary : pandas.DataFrame
The REDCap data.
Returns
-------
tuple
Three dataframes, consisting of the transformed data. the data
dictionary, and the quality report.
""" # noqa : E501
df_map = data.copy()
forms = ["presentation", "daily", "outcome"]
columns = dictionary.loc[dictionary["form_name"].isin(forms), "field_name"].tolist()
columns = [col for col in columns if col in df_map.columns]
ind = data["form_name"].apply(
lambda x: any(y in x.split(",") for y in ["presentation", "outcome"])
)
df_map = df_map.reset_index(drop=True)
ind = ind.reset_index(drop=True) if hasattr(ind, "reset_index") else ind
###########################################################################
###########################################################################
# QUALITY CHECK 1 Patients has either Presentation or Outcome forms
included_subjid = set(df_map.loc[ind, "subjid"].dropna().unique())
# Keep deterministic order based on first appearance in source data.
missing_id_QC1 = [
subjid
for subjid in pd.unique(data["subjid"])
if pd.notna(subjid) and subjid not in included_subjid
]
qc = "QUALITY CHECK 1: Patient does not have Presentation or Outcome forms"
quality_report = {qc: missing_id_QC1}
columns = [col for col in columns if col in df_map.columns]
df_map = df_map.loc[ind, columns]
df_map = df_map.set_index("subjid").groupby(level=0).bfill()
df_map = df_map.drop(columns=[col for col in df_map.columns if "redcap" in col])
df_map = df_map.reset_index().drop_duplicates("subjid")
df_map = df_map.reset_index(drop=True)
other_value_ind = df_map["demog_sex"].isin(["Male", "Female"]) == 0
df_map.loc[other_value_ind, "demog_sex"] = "Other / Unknown"
mapping_dict = {
"Discharged alive": "Discharged", # :)
"Discharged against medical advice": "Discharged", # :)
"Death": "Death", # :(
"Palliative care": "Death", # :(
}
df_map["outco_binary_outcome"] = map_variable(
df_map["outco_outcome"].fillna("Censored"),
mapping_dict,
other_value_str="Censored",
)
outcome_dict = {}
outcomes = ["Death", "Discharged", "Censored"]
outcome_dict["field_name"] = ["outco_binary_outcome"]
outcome_dict["field_name"] += ["outco_binary_outcome___" + x for x in outcomes]
outcome_dict["form_name"] = "outcome"
outcome_dict["field_type"] = ["categorical"] + ["binary"] * len(outcomes)
outcome_dict["field_label"] = ["Outcome (binary)"] + outcomes
outcome_dict["parent"] = ["outco"] + ["outco_binary_outcome"] * len(outcomes)
outcome_dict["branching_logic"] = ""
dictionary = pd.concat([dictionary, pd.DataFrame.from_dict(outcome_dict)], axis=0)
dictionary = dictionary.reset_index(drop=True)
logger.debug(f"Data contains {df_map.shape[0]} patients")
return df_map, dictionary, quality_report
[docs]
def get_redcap_data(
redcap_url: str,
redcap_api_key: str,
data_access_groups: typing.Iterable[str] | None = None,
user_assigned_to_dag: bool | None = False,
country_mapping: dict | None = None,
) -> tuple[pandas.DataFrame | dict[str, pandas.DataFrame] | dict[str, typing.Any]]:
""":py:class:`tuple` : Returns data from REDCap API and transforms them into analysis-ready dataframes.
Parameters
----------
redcap_url : str
The REDCap database URL.
redcap_api_key : str
The REDCap API key.
data_access_groups : typing.Iterable, default=None
Optional iterable of data access group (DAG) names.
user_assigned_to_dag : bool, default=None
Whether the user is assigned to a DAG.
country_mapping : dict
The countries table.
""" # noqa : E501
total_started = time.perf_counter()
logger.info("REDCap data pipeline start")
step_started = time.perf_counter()
data = get_records(
redcap_url,
redcap_api_key,
data_access_groups=data_access_groups,
user_assigned_to_dag=user_assigned_to_dag,
)
logger.info(
f"REDCap step get_records finished in {time.perf_counter() - step_started:.1f}s"
)
step_started = time.perf_counter()
dictionary = get_data_dictionary(redcap_url, redcap_api_key)
logger.info(
"REDCap step get_data_dictionary finished in "
f"{time.perf_counter() - step_started:.1f}s"
)
step_started = time.perf_counter()
missing_data_codes = get_missing_data_codes(redcap_url, redcap_api_key)
logger.info(
"REDCap step get_missing_data_codes finished in "
f"{time.perf_counter() - step_started:.1f}s"
)
step_started = time.perf_counter()
data, new_dictionary = initial_data_processing(data, dictionary, missing_data_codes)
logger.info(
"REDCap step initial_data_processing finished in "
f"{time.perf_counter() - step_started:.1f}s "
f"(rows={len(data)}, cols={len(data.columns)})"
)
redcap_columns = ["redcap_event_name", "redcap_repeat_instrument"]
redcap_columns += ["redcap_repeat_instance", "redcap_data_access_group"]
redcap_columns = [col for col in redcap_columns if col not in data.columns]
data = pd.concat([data, pd.DataFrame(columns=redcap_columns)], axis=1)
# Get forms and events from the API
step_started = time.perf_counter()
form, form_event = get_form_event(redcap_url, redcap_api_key)
logger.info(
f"REDCap step get_form_event finished in "
f"{time.perf_counter() - step_started:.1f}s"
)
# Convert repeating forms from label to name
form_dict = dict(zip(form["form_label"], form["form_name"]))
data.loc[:, "form_name"] = data["redcap_repeat_instrument"].map(form_dict)
# Else convert events into a string-delimited str of forms
form_dict = dict(zip(form_event["event_name"], form_event["form_name"]))
data.loc[data["form_name"].isna(), "form_name"] = data.loc[
data["form_name"].isna(), "redcap_event_name"
].map(form_dict)
data = data.loc[data["form_name"].notna()].reset_index(drop=True)
step_started = time.perf_counter()
df_map, new_dictionary, quality_report = get_df_map(data, new_dictionary)
logger.info(
f"REDCap step get_df_map finished in "
f"{time.perf_counter() - step_started:.1f}s (rows={len(df_map)})"
)
step_started = time.perf_counter()
df_forms_dict = get_df_forms(data, new_dictionary)
logger.info(
f"REDCap step get_df_forms finished in "
f"{time.perf_counter() - step_started:.1f}s "
f"(forms={len(df_forms_dict)})"
)
if "demog_country" in dictionary["field_name"].values:
countries = load_countries_table(encoding="latin-1")
df_map["country_iso"] = df_map["demog_country"].replace(
dict(zip(countries["Country"], countries["Code"]))
)
elif country_mapping is None:
dag = data[["subjid", "redcap_data_access_group"]].drop_duplicates()
dag = dag.rename(columns={"redcap_data_access_group": "site"})
dag["country_iso"] = dag["site"].apply(lambda x: x.split("-")[1])
df_map = pd.merge(df_map, dag, on="subjid", how="left")
else:
# TODO: Need something better here?
try:
df_map["country_iso"] = df_map["subjid"].str.split("-").str[0]
df_map["country_iso"] = df_map["country"].map(country_mapping)
except Exception:
df_map["country_iso"] = np.nan
# Add country_iso to dictionary
countries = df_map["country_iso"].drop_duplicates().tolist()
country_dict = {}
country_dict["field_name"] = ["country", "country_iso"]
country_dict["field_name"] += ["country_iso___" + x for x in countries]
country_dict["form_name"] = "presentation"
country_dict["field_type"] = ["section", "categorical"]
country_dict["field_type"] += ["binary"] * len(countries)
country_dict["field_label"] = ["COUNTRY", "Country ISO Code"] + countries
country_dict["parent"] = ["", "country"] + ["country_iso"] * len(countries)
country_dict["branching_logic"] = ""
new_dictionary = pd.concat(
[new_dictionary, pd.DataFrame.from_dict(country_dict)], axis=0
)
new_dictionary = new_dictionary.reset_index(drop=True)
logger.info(
f"REDCap data pipeline complete in {time.perf_counter() - total_started:.1f}s"
)
return df_map, df_forms_dict, new_dictionary, quality_report