Skip to content

The Two Stages Procedure of Meir and Gorfine (2023) - Efron

This class implements the approach of Meir et al. (2022):

Examples:

1
2
3
4
    from pydts.fitters import TwoStagesFitter
    fitter = TwoStagesFitter()
    fitter.fit(df=train_df, event_type_col='J', duration_col='X')
    fitter.print_summary()

References

[1] Meir, Tomer*, Gutman, Rom*, and Gorfine, Malka, "PyDTS: A Python Package for Discrete-Time Survival Analysis with Competing Risks" (2022)

Source code in pydts/fitters.py
class TwoStagesFitter(ExpansionBasedFitter):

    """
    This class implements the approach of Meir et al. (2022):

    Example:
        ```py linenums="1"
            from pydts.fitters import TwoStagesFitter
            fitter = TwoStagesFitter()
            fitter.fit(df=train_df, event_type_col='J', duration_col='X')
            fitter.print_summary()
        ```

    References:
        [1] Meir, Tomer\*, Gutman, Rom\*, and Gorfine, Malka, "PyDTS: A Python Package for Discrete-Time Survival Analysis with Competing Risks" (2022)
    """

    def __init__(self):
        super().__init__()
        self.alpha_df = pd.DataFrame()
        self.beta_models = {}
        self.beta_models_params_attr = 'params_'

    def _alpha_jt(self, x, df, y_t, beta_j, n_jt, t, event):
        # Alpha_jt optimization objective
        partial_df = df[df[self.duration_col] >= t]
        if isinstance(self.covariates, list):
            expit_add = np.dot(partial_df[self.covariates], beta_j)
        elif isinstance(self.covariates, dict):
            expit_add = np.dot(partial_df[self.covariates[event]], beta_j)
        else:
            raise ValueError
        return ((1 / y_t) * np.sum(expit(x + expit_add)) - (n_jt / y_t)) ** 2

    def _fit_event_beta(self, expanded_df, event, model=CoxPHFitter, model_kwargs={}, model_fit_kwargs={}):
        # Model fitting for conditional estimation of Beta_j for specific event
        if isinstance(self.covariates, list):
            strata_df = expanded_df[self.covariates + [f'j_{event}', self.duration_col]].copy()
        elif isinstance(self.covariates, dict):
            strata_df = expanded_df[self.covariates[event] + [f'j_{event}', self.duration_col]].copy()
        else:
            raise TypeError
        strata_df.loc[:, f'{self.duration_col}_copy'] = np.ones_like(expanded_df[self.duration_col])

        beta_j_model = model(**model_kwargs)
        if isinstance(self.covariates, list):
            beta_j_model.fit(df=strata_df[self.covariates + [f'{self.duration_col}', f'{self.duration_col}_copy', f'j_{event}']],
                             duration_col=f'{self.duration_col}_copy', event_col=f'j_{event}', strata=self.duration_col,
                             **model_fit_kwargs, batch_mode=False)
        elif isinstance(self.covariates, dict):
            beta_j_model.fit(df=strata_df[self.covariates[event] + [f'{self.duration_col}', f'{self.duration_col}_copy', f'j_{event}']],
                             duration_col=f'{self.duration_col}_copy', event_col=f'j_{event}', strata=self.duration_col,
                             **model_fit_kwargs, batch_mode=False)
        return beta_j_model

    def _fit_beta(self, expanded_df, events, model=CoxPHFitter, model_kwargs={}, model_fit_kwargs={}):
        # Model fitting for conditional estimation of Beta_j for all events
        _model_kwargs_per_event = np.any([event in model_kwargs.keys() for event in events])
        _model_fit_kwargs_per_event = np.any([event in model_fit_kwargs.keys() for event in events])
        beta_models = {}
        for event in events:
            _model_kwargs = model_kwargs[event] if _model_kwargs_per_event else model_kwargs
            _model_fit_kwargs = model_fit_kwargs[event] if _model_fit_kwargs_per_event else model_fit_kwargs
            beta_models[event] = self._fit_event_beta(expanded_df=expanded_df, event=event,
                                                      model=model, model_kwargs=_model_kwargs,
                                                      model_fit_kwargs=_model_fit_kwargs)
        return beta_models

    def fit(self,
            df: pd.DataFrame,
            covariates: Union[list, dict] = None,
            event_type_col: str = 'J',
            duration_col: str = 'X',
            pid_col: str = 'pid',
            skip_expansion: bool = False,
            x0: Union[np.array, int] = 0,
            fit_beta_kwargs: dict = {},
            verbose: int = 2,
            nb_workers: int = WORKERS) -> dict:
        """
        This method fits a model to the discrete data.

        Args:
            df (pd.DataFrame): training data for fitting the model
            covariates (list): list of covariates to be used in estimating the regression coefficients
            event_type_col (str): The event type column name (must be a column in df),
                                  Right-censored sample (i) is indicated by event value 0, df.loc[i, event_type_col] = 0.
            duration_col (str): Last follow up time column name (must be a column in df).
            pid_col (str): Sample ID column name (must be a column in df).
            skip_expansion (boolean): Skips the dataframe expansion step. Use this option only if the provided dataframe (df) is already correctly expanded (see [1]).
                                      When set to True, the df is expected to be in the format produced by the pydts.utils.get_expanded_df() method, as if it were applied to the unexpanded data.
            x0 (Union[numpy.array, int], Optional): initial guess to pass to scipy.optimize.minimize function
            fit_beta_kwargs (dict, Optional): Keyword arguments to pass on to the estimation procedure.
                                              If different model for beta is desired, it can be defined here.
                                              For example:
                                              fit_beta_kwargs={
                                                    model=CoxPHFitter, # model object
                                                    model_kwargs={},  # keywords arguments to pass on to the model instance initiation
                                                    model_fit_kwargs={}  # keywords arguments to pass on to model.fit() method
                                              }
            verbose (int, Optional): The verbosity level of pandaallel
            nb_workers (int, Optional): The number of workers to pandaallel. If not sepcified, defaults to the number of workers available.
        Returns:
            event_models (dict): Fitted models dictionary. Keys - event names, Values - fitted models for the event.

        References:
            [1] Meir, Tomer and Gorfine, Malka, "Discrete-time Competing-Risks Regression with or without Penalization", https://arxiv.org/abs/2303.01186
        """

        self._validate_cols(df, event_type_col, duration_col, pid_col)
        self.events = [c for c in sorted(df[event_type_col].unique()) if c != 0]
        if (covariates is not None):
            cov_not_in_df = []
            if isinstance(covariates, list):
                cov_not_in_df = [cov for cov in covariates if cov not in df.columns]
            elif isinstance(covariates, dict):
                for event in self.events:
                    event_cov_not_in_df = [cov for cov in covariates[event] if cov not in df.columns]
                    cov_not_in_df.extend(event_cov_not_in_df)
            if len(cov_not_in_df) > 0:
                raise ValueError(f"Error during fit - missing covariates from df: {cov_not_in_df}")

        #pandarallel.initialize(verbose=verbose, nb_workers=nb_workers)
        if covariates is None:
            covariates = [col for col in df if col not in [event_type_col, duration_col, pid_col]]
        self.covariates = covariates
        self.event_type_col = event_type_col
        self.duration_col = duration_col
        self.pid_col = pid_col
        self.times = sorted(df[duration_col].unique())

        if not skip_expansion:
            expanded_df = self._expand_data(df=df, event_type_col=event_type_col, duration_col=duration_col,
                                                 pid_col=pid_col)
        else:
            print('Skipping data expansion step, only use this option if the provided dataframe (df) is already correctly expanded.')
            expanded_df = df

        self.beta_models = self._fit_beta(expanded_df, self.events, **fit_beta_kwargs)

        y_t = (df[duration_col]
               .value_counts()
               .sort_index(ascending=False)  # each event count for its occurring time and the times before
               .cumsum()
               .sort_index()
               )
        n_jt = df.groupby([event_type_col, duration_col]).size().to_frame().reset_index()
        n_jt.columns = [event_type_col, duration_col, 'n_jt']

        for event in self.events:

            n_et = n_jt[n_jt[event_type_col] == event].copy()

            if isinstance(self.beta_models[event], CoxPHFitter):
                self.beta_models_params_attr = 'params_'
                _res = Parallel(n_jobs=nb_workers)(delayed(minimize)(self._alpha_jt, x0=x0,
                                                                     args=(df, y_t.loc[row[duration_col]],
                                                                           getattr(self.beta_models[event],
                                                                                   self.beta_models_params_attr),
                                                                           row['n_jt'],
                                                                           row[duration_col], event),
                                                                     method='BFGS',
                                                                     options={'gtol': 1e-7, 'eps': 1.5e-08,
                                                                              'maxiter': 200})
                                                                     for _, row in n_et.iterrows())
                n_et['success'] = Parallel(n_jobs=nb_workers)(delayed(lambda row: row.success)(val)
                                                              for val in _res)
                n_et['alpha_jt'] = Parallel(n_jobs=nb_workers)(delayed(lambda row: row.x[0])(val)
                                                               for val in _res)

            elif isinstance(self.beta_models[event], ConditionalResultsWrapper) or \
                    isinstance(self.beta_models[event], RegularizedResultsWrapper):
                self.beta_models_params_attr = 'params'
                for idx, row in n_et.iterrows():
                    _res = minimize(self._alpha_jt,
                                    x0=x0,
                                    args=(df,
                                          y_t.loc[row[duration_col]],
                                          getattr(self.beta_models[event], self.beta_models_params_attr),
                                          row['n_jt'],
                                          row[duration_col],
                                          event),
                                    method='BFGS',
                                    options={'gtol': 1e-7, 'eps': 1.5e-08, 'maxiter': 200})
                    n_et.loc[idx, 'success'] = _res.success
                    n_et.loc[idx, 'alpha_jt'] = _res.x[0]
            else:
                raise ValueError

            # n_et['opt_res'] = n_et.parallel_apply(lambda row: minimize(self._alpha_jt, x0=x0,
            #                         args=(df, y_t.loc[row[duration_col]], event_beta_params, row['n_jt'],
            #                         row[duration_col], event), method='BFGS',
            #                         options={'gtol': 1e-7, 'eps': 1.5e-08, 'maxiter': 200}), axis=1)
            # n_et['success'] = n_et['opt_res'].parallel_apply(lambda val: val.success)
            # n_et['alpha_jt'] = n_et['opt_res'].parallel_apply(lambda val: val.x[0])

            assert_fit(n_et, self.times[:-1], event_type_col=event_type_col, duration_col=duration_col)  # todo move basic input validation before any optimization
            self.event_models[event] = [self.beta_models[event], n_et]
            self.alpha_df = pd.concat([self.alpha_df, n_et], ignore_index=True)
        return self.event_models

    def print_summary(self,
                      summary_func: str = "print_summary",
                      summary_kwargs: dict = {}) -> None:
        """
        This method prints the summary of the fitted models for all the events.

        Args:
            summary_func (str, Optional): print summary method of the fitted model type ("summary", "print_summary").
            summary_kwargs (dict, Optional): Keyword arguments to pass to the model summary function.

        Returns:
            None
        """
        from IPython.display import display
        display(self.get_beta_SE())

        for event, model in self.event_models.items():
            print(f'\n\nModel summary for event: {event}')
            display(model[1].set_index([self.event_type_col, self.duration_col]))


    def plot_event_alpha(self, event: Union[str, int], ax: plt.Axes = None, scatter_kwargs: dict = {},
                         show=True, title=None, xlabel='t', ylabel=r'$\alpha_{jt}$', fontsize=18,
                         color: str = None, label: str = None, ticklabelsize: int = 15) -> plt.Axes:
        """
        This function plots a scatter plot of the $ alpha_{jt} $ coefficients of a specific event.
        Args:
            event (Union[str, int]): event name
            ax (matplotlib.pyplot.Axes, Optional): ax to use
            scatter_kwargs (dict, Optional): keywords to pass to the scatter function
            show (bool, Optional): if to use plt.show()
            title (str, Optional): axes title
            xlabel (str, Optional): axes xlabel
            ylabel (str, Optional): axes ylabel
            fontsize (int, Optional): axes title, xlabel, ylabel fontsize
            color (str, Optional): color name to use
            label (str, Optional): label name
        Returns:
            ax (matplotlib.pyplot.Axes): output figure
        """

        assert event in self.events, f"Cannot plot event {event} alpha - it was not included during .fit()"

        if ax is None:
            fig, ax = plt.subplots(1, 1)
        ax.tick_params(axis='both', which='major', labelsize=ticklabelsize)
        ax.tick_params(axis='both', which='minor', labelsize=ticklabelsize)
        title = r'$\alpha_{jt}$' + f' for event {event}' if title is None else title
        label = f'{event}' if label is None else label
        color = 'tab:blue' if color is None else color
        alpha_df = self.event_models[event][1]
        ax.scatter(alpha_df[self.duration_col].values, alpha_df['alpha_jt'].values, label=label,
                   color=color, **scatter_kwargs)
        ax.set_title(title, fontsize=fontsize)
        ax.set_xlabel(xlabel, fontsize=fontsize)
        ax.set_ylabel(ylabel, fontsize=fontsize)
        if show:
            plt.show()
        return ax

    def plot_all_events_alpha(self, ax: plt.Axes = None, scatter_kwargs: dict = {}, colors: list = COLORS,
                              show: bool = True, title: Union[str, None] = None, xlabel: str = 't',
                              ylabel: str = r'$\alpha_{jt}$', fontsize: int = 18, ticklabelsize: int = 15) -> plt.Axes:
        """
        This function plots a scatter plot of the $ alpha_{jt} $ coefficients of all the events.
        Args:
            ax (matplotlib.pyplot.Axes, Optional): ax to use
            scatter_kwargs (dict, Optional): keywords to pass to the scatter function
            colors (list, Optional): colors names
            show (bool, Optional): if to use plt.show()
            title (str, Optional): axes title
            xlabel (str, Optional): axes xlabel
            ylabel (str, Optional): axes ylabel
            fontsize (int, Optional): axes title, xlabel, ylabel fontsize

        Returns:
            ax (matplotlib.pyplot.Axes): output figure
        """
        if ax is None:
            fig, ax = plt.subplots(1, 1)
        ax.tick_params(axis='both', which='major', labelsize=ticklabelsize)
        ax.tick_params(axis='both', which='minor', labelsize=ticklabelsize)
        title = r'$\alpha_{jt}$' + f' for all events' if title is None else title
        for idx, (event, model) in enumerate(self.event_models.items()):
            label = f'{event}'
            color = colors[idx % len(colors)]
            self.plot_event_alpha(event=event, ax=ax, scatter_kwargs=scatter_kwargs, show=False, title=title,
                                  ylabel=ylabel, xlabel=xlabel, fontsize=fontsize, label=label, color=color,
                                  ticklabelsize=ticklabelsize)
        ax.legend()
        if show:
            plt.show()
        return ax

    def predict_hazard_jt(self,
                          df: pd.DataFrame,
                          event: Union[str, int],
                          t: Union[Iterable, int]) -> pd.DataFrame:
        """
        This method calculates the hazard for the given event at the given time values if they were included in
        the training set of the event.

        Args:
            df (pd.DataFrame): samples to predict for
            event (Union[str, int]): event name
            t (Union[Iterable, int]): times to calculate the hazard for

        Returns:
            df (pd.DataFrame): samples with the prediction columns
        """
        self._validate_covariates_in_df(df.head())
        t = self._validate_t(t, return_iter=True)
        assert event in self.events, \
            f"Cannot predict for event {event} - it was not included during .fit()"

        model = self.event_models[event]
        alpha_df = model[1].set_index(self.duration_col)['alpha_jt'].copy()

        _t = np.array([t_i for t_i in t if (f'hazard_j{event}_t{t_i}' not in df.columns)])
        if len(_t) == 0:
            return df
        temp_df = df.copy()
        if isinstance(self.covariates, list):
            beta_j_x = temp_df[self.covariates].dot(getattr(model[0], self.beta_models_params_attr))
        elif isinstance(self.covariates, dict):
            beta_j_x = temp_df[self.covariates[event]].dot(getattr(model[0], self.beta_models_params_attr))
        temp_df[[f'hazard_j{event}_t{c}' for c in _t]] = pd.concat(
            [self._hazard_inverse_transformation(alpha_df[c] + beta_j_x) for c in _t], axis=1).values
        return temp_df

    def _hazard_transformation(self, a: Union[int, np.array, pd.Series, pd.DataFrame]) -> \
            Union[int, np.array, pd.Series, pd.DataFrame]:
        """
        This function defines the transformation of the hazard function such that
        $ h ( \lambda_j (t | Z) ) = \alpha_{jt} + Z^{T} \beta_{j} $

        Args:
            a (Union[int, np.array, pd.Series, pd.DataFrame]):

        Returns:
            i (Union[int, np.array, pd.Series, pd.DataFrame]): the inverse function applied on a. $ h^{-1} (a)$
        """

        i = logit(a)
        return i

    def _hazard_inverse_transformation(self, a: Union[int, np.array, pd.Series, pd.DataFrame]) -> \
            Union[int, np.array, pd.Series, pd.DataFrame]:
        """
        This function defines the inverse transformation of the hazard function such that
        $\lambda_j (t | Z) = h^{-1} ( \alpha_{jt} + Z^{T} \beta_{j} )$

        Args:
            a (Union[int, np.array, pd.Series, pd.DataFrame]):

        Returns:
            i (Union[int, np.array, pd.Series, pd.DataFrame]): the inverse function applied on a. $ h^{-1} (a) $
        """
        i = expit(a)
        return i

    def get_beta_SE(self):
        """
        This function returns the Beta coefficients and their Standard Errors for all the events.

        Returns:
            se_df (pandas.DataFrame): Beta coefficients and Standard Errors Dataframe

        """
        se_df = pd.DataFrame()
        for event, model in self.beta_models.items():
            mdf = pd.concat([model.params_, model.standard_errors_], axis=1)
            mdf.columns = [f'j{event}_params', f'j{event}_SE']
            se_df = pd.concat([se_df, mdf], axis=1)
        return se_df

    def get_alpha_df(self):
        """
        This function returns the Alpha coefficients for all the events.

        Returns:
            alpha_df (pandas.DataFrame): Alpha coefficients Dataframe

        """

        alpha_df = pd.DataFrame()
        for event, model in self.event_models.items():
            model_alpha_df = model[1].set_index([self.event_type_col, self.duration_col])
            model_alpha_df.columns = pd.MultiIndex.from_product([[event], model_alpha_df.columns])
            alpha_df = pd.concat([alpha_df, model_alpha_df], axis=1)

        return alpha_df

    def plot_all_events_beta(self, ax: plt.Axes = None, colors: list = COLORS, show: bool = True,
                             title: Union[str, None] = None, xlabel: str = 'Value',  ylabel: str = r'$\beta_{j}$',
                             fontsize: int = 18, ticklabelsize: int = 15) -> plt.Axes:
        """
        This function plots the $ beta_{j} $ coefficients and standard errors of all the events.
        Args:
            ax (matplotlib.pyplot.Axes, Optional): ax to use
            colors (list, Optional): colors names
            show (bool, Optional): if to use plt.show()
            title (str, Optional): axes title
            xlabel (str, Optional): axes xlabel
            ylabel (str, Optional): axes ylabel
            fontsize (int, Optional): axes title, xlabel, ylabel fontsize
            ticklabelsize (int, Optional): axes xticklabels, yticklabels fontsize
        Returns:
            ax (matplotlib.pyplot.Axes): output figure
        """
        if ax is None:
            fig, ax = plt.subplots(1, 1)
        title = r'$\beta_{j}$' + f' for all events' if title is None else title
        ax.tick_params(axis='both', which='major', labelsize=ticklabelsize)
        ax.tick_params(axis='both', which='minor', labelsize=ticklabelsize)
        se_df = self.get_beta_SE()

        for idx, col in enumerate(se_df.columns):
            if idx % 2 == 1:
                continue
            y = np.arange((idx//2)*len(se_df), (1+(idx//2))*len(se_df))
            ax.errorbar(x=se_df.iloc[:, idx].values, y=y,
                       color=colors[idx % len(colors)], xerr=se_df.iloc[:, idx+1].values, label=f'{col}',
                       markersize=6, ls='', marker='o')

        yt = list(se_df.index) * (len(se_df.columns) // 2)
        ax.set_yticks(np.arange(0, len(yt)))
        ax.set_yticklabels(yt)
        ax.set_title(title, fontsize=fontsize)
        ax.set_xlabel(xlabel, fontsize=fontsize)
        ax.set_ylabel(ylabel, fontsize=fontsize)
        ax.grid()
        plt.gca().invert_yaxis()
        ax.legend()
        if show:
            plt.show()
        return ax

fit(self, df, covariates=None, event_type_col='J', duration_col='X', pid_col='pid', skip_expansion=False, x0=0, fit_beta_kwargs={}, verbose=2, nb_workers=2) ¤

This method fits a model to the discrete data.

Parameters:

Name Type Description Default
df pd.DataFrame

training data for fitting the model

required
covariates list

list of covariates to be used in estimating the regression coefficients

None
event_type_col str

The event type column name (must be a column in df), Right-censored sample (i) is indicated by event value 0, df.loc[i, event_type_col] = 0.

'J'
duration_col str

Last follow up time column name (must be a column in df).

'X'
pid_col str

Sample ID column name (must be a column in df).

'pid'
skip_expansion boolean

Skips the dataframe expansion step. Use this option only if the provided dataframe (df) is already correctly expanded (see [1]). When set to True, the df is expected to be in the format produced by the pydts.utils.get_expanded_df() method, as if it were applied to the unexpanded data.

False
x0 Union[numpy.array, int], Optional

initial guess to pass to scipy.optimize.minimize function

0
fit_beta_kwargs dict, Optional

Keyword arguments to pass on to the estimation procedure. If different model for beta is desired, it can be defined here. For example: fit_beta_kwargs={ model=CoxPHFitter, # model object model_kwargs={}, # keywords arguments to pass on to the model instance initiation model_fit_kwargs={} # keywords arguments to pass on to model.fit() method }

{}
verbose int, Optional

The verbosity level of pandaallel

2
nb_workers int, Optional

The number of workers to pandaallel. If not sepcified, defaults to the number of workers available.

2

Returns:

Type Description
event_models (dict)

Fitted models dictionary. Keys - event names, Values - fitted models for the event.

References

[1] Meir, Tomer and Gorfine, Malka, "Discrete-time Competing-Risks Regression with or without Penalization", https://arxiv.org/abs/2303.01186

Source code in pydts/fitters.py
def fit(self,
        df: pd.DataFrame,
        covariates: Union[list, dict] = None,
        event_type_col: str = 'J',
        duration_col: str = 'X',
        pid_col: str = 'pid',
        skip_expansion: bool = False,
        x0: Union[np.array, int] = 0,
        fit_beta_kwargs: dict = {},
        verbose: int = 2,
        nb_workers: int = WORKERS) -> dict:
    """
    This method fits a model to the discrete data.

    Args:
        df (pd.DataFrame): training data for fitting the model
        covariates (list): list of covariates to be used in estimating the regression coefficients
        event_type_col (str): The event type column name (must be a column in df),
                              Right-censored sample (i) is indicated by event value 0, df.loc[i, event_type_col] = 0.
        duration_col (str): Last follow up time column name (must be a column in df).
        pid_col (str): Sample ID column name (must be a column in df).
        skip_expansion (boolean): Skips the dataframe expansion step. Use this option only if the provided dataframe (df) is already correctly expanded (see [1]).
                                  When set to True, the df is expected to be in the format produced by the pydts.utils.get_expanded_df() method, as if it were applied to the unexpanded data.
        x0 (Union[numpy.array, int], Optional): initial guess to pass to scipy.optimize.minimize function
        fit_beta_kwargs (dict, Optional): Keyword arguments to pass on to the estimation procedure.
                                          If different model for beta is desired, it can be defined here.
                                          For example:
                                          fit_beta_kwargs={
                                                model=CoxPHFitter, # model object
                                                model_kwargs={},  # keywords arguments to pass on to the model instance initiation
                                                model_fit_kwargs={}  # keywords arguments to pass on to model.fit() method
                                          }
        verbose (int, Optional): The verbosity level of pandaallel
        nb_workers (int, Optional): The number of workers to pandaallel. If not sepcified, defaults to the number of workers available.
    Returns:
        event_models (dict): Fitted models dictionary. Keys - event names, Values - fitted models for the event.

    References:
        [1] Meir, Tomer and Gorfine, Malka, "Discrete-time Competing-Risks Regression with or without Penalization", https://arxiv.org/abs/2303.01186
    """

    self._validate_cols(df, event_type_col, duration_col, pid_col)
    self.events = [c for c in sorted(df[event_type_col].unique()) if c != 0]
    if (covariates is not None):
        cov_not_in_df = []
        if isinstance(covariates, list):
            cov_not_in_df = [cov for cov in covariates if cov not in df.columns]
        elif isinstance(covariates, dict):
            for event in self.events:
                event_cov_not_in_df = [cov for cov in covariates[event] if cov not in df.columns]
                cov_not_in_df.extend(event_cov_not_in_df)
        if len(cov_not_in_df) > 0:
            raise ValueError(f"Error during fit - missing covariates from df: {cov_not_in_df}")

    #pandarallel.initialize(verbose=verbose, nb_workers=nb_workers)
    if covariates is None:
        covariates = [col for col in df if col not in [event_type_col, duration_col, pid_col]]
    self.covariates = covariates
    self.event_type_col = event_type_col
    self.duration_col = duration_col
    self.pid_col = pid_col
    self.times = sorted(df[duration_col].unique())

    if not skip_expansion:
        expanded_df = self._expand_data(df=df, event_type_col=event_type_col, duration_col=duration_col,
                                             pid_col=pid_col)
    else:
        print('Skipping data expansion step, only use this option if the provided dataframe (df) is already correctly expanded.')
        expanded_df = df

    self.beta_models = self._fit_beta(expanded_df, self.events, **fit_beta_kwargs)

    y_t = (df[duration_col]
           .value_counts()
           .sort_index(ascending=False)  # each event count for its occurring time and the times before
           .cumsum()
           .sort_index()
           )
    n_jt = df.groupby([event_type_col, duration_col]).size().to_frame().reset_index()
    n_jt.columns = [event_type_col, duration_col, 'n_jt']

    for event in self.events:

        n_et = n_jt[n_jt[event_type_col] == event].copy()

        if isinstance(self.beta_models[event], CoxPHFitter):
            self.beta_models_params_attr = 'params_'
            _res = Parallel(n_jobs=nb_workers)(delayed(minimize)(self._alpha_jt, x0=x0,
                                                                 args=(df, y_t.loc[row[duration_col]],
                                                                       getattr(self.beta_models[event],
                                                                               self.beta_models_params_attr),
                                                                       row['n_jt'],
                                                                       row[duration_col], event),
                                                                 method='BFGS',
                                                                 options={'gtol': 1e-7, 'eps': 1.5e-08,
                                                                          'maxiter': 200})
                                                                 for _, row in n_et.iterrows())
            n_et['success'] = Parallel(n_jobs=nb_workers)(delayed(lambda row: row.success)(val)
                                                          for val in _res)
            n_et['alpha_jt'] = Parallel(n_jobs=nb_workers)(delayed(lambda row: row.x[0])(val)
                                                           for val in _res)

        elif isinstance(self.beta_models[event], ConditionalResultsWrapper) or \
                isinstance(self.beta_models[event], RegularizedResultsWrapper):
            self.beta_models_params_attr = 'params'
            for idx, row in n_et.iterrows():
                _res = minimize(self._alpha_jt,
                                x0=x0,
                                args=(df,
                                      y_t.loc[row[duration_col]],
                                      getattr(self.beta_models[event], self.beta_models_params_attr),
                                      row['n_jt'],
                                      row[duration_col],
                                      event),
                                method='BFGS',
                                options={'gtol': 1e-7, 'eps': 1.5e-08, 'maxiter': 200})
                n_et.loc[idx, 'success'] = _res.success
                n_et.loc[idx, 'alpha_jt'] = _res.x[0]
        else:
            raise ValueError

        # n_et['opt_res'] = n_et.parallel_apply(lambda row: minimize(self._alpha_jt, x0=x0,
        #                         args=(df, y_t.loc[row[duration_col]], event_beta_params, row['n_jt'],
        #                         row[duration_col], event), method='BFGS',
        #                         options={'gtol': 1e-7, 'eps': 1.5e-08, 'maxiter': 200}), axis=1)
        # n_et['success'] = n_et['opt_res'].parallel_apply(lambda val: val.success)
        # n_et['alpha_jt'] = n_et['opt_res'].parallel_apply(lambda val: val.x[0])

        assert_fit(n_et, self.times[:-1], event_type_col=event_type_col, duration_col=duration_col)  # todo move basic input validation before any optimization
        self.event_models[event] = [self.beta_models[event], n_et]
        self.alpha_df = pd.concat([self.alpha_df, n_et], ignore_index=True)
    return self.event_models

get_alpha_df(self) ¤

This function returns the Alpha coefficients for all the events.

Returns:

Type Description
alpha_df (pandas.DataFrame)

Alpha coefficients Dataframe

Source code in pydts/fitters.py
def get_alpha_df(self):
    """
    This function returns the Alpha coefficients for all the events.

    Returns:
        alpha_df (pandas.DataFrame): Alpha coefficients Dataframe

    """

    alpha_df = pd.DataFrame()
    for event, model in self.event_models.items():
        model_alpha_df = model[1].set_index([self.event_type_col, self.duration_col])
        model_alpha_df.columns = pd.MultiIndex.from_product([[event], model_alpha_df.columns])
        alpha_df = pd.concat([alpha_df, model_alpha_df], axis=1)

    return alpha_df

get_beta_SE(self) ¤

This function returns the Beta coefficients and their Standard Errors for all the events.

Returns:

Type Description
se_df (pandas.DataFrame)

Beta coefficients and Standard Errors Dataframe

Source code in pydts/fitters.py
def get_beta_SE(self):
    """
    This function returns the Beta coefficients and their Standard Errors for all the events.

    Returns:
        se_df (pandas.DataFrame): Beta coefficients and Standard Errors Dataframe

    """
    se_df = pd.DataFrame()
    for event, model in self.beta_models.items():
        mdf = pd.concat([model.params_, model.standard_errors_], axis=1)
        mdf.columns = [f'j{event}_params', f'j{event}_SE']
        se_df = pd.concat([se_df, mdf], axis=1)
    return se_df

plot_all_events_alpha(self, ax=None, scatter_kwargs={}, colors=['tab:blue', 'tab:orange', 'tab:green', 'tab:red', 'tab:purple', 'tab:brown', 'tab:pink', 'tab:gray', 'tab:olive', 'tab:cyan'], show=True, title=None, xlabel='t', ylabel='$\\alpha_{jt}$', fontsize=18, ticklabelsize=15) ¤

This function plots a scatter plot of the $ alpha_{jt} $ coefficients of all the events.

Parameters:

Name Type Description Default
ax matplotlib.pyplot.Axes, Optional

ax to use

None
scatter_kwargs dict, Optional

keywords to pass to the scatter function

{}
colors list, Optional

colors names

['tab:blue', 'tab:orange', 'tab:green', 'tab:red', 'tab:purple', 'tab:brown', 'tab:pink', 'tab:gray', 'tab:olive', 'tab:cyan']
show bool, Optional

if to use plt.show()

True
title str, Optional

axes title

None
xlabel str, Optional

axes xlabel

't'
ylabel str, Optional

axes ylabel

'$\\alpha_{jt}$'
fontsize int, Optional

axes title, xlabel, ylabel fontsize

18

Returns:

Type Description
ax (matplotlib.pyplot.Axes)

output figure

Source code in pydts/fitters.py
def plot_all_events_alpha(self, ax: plt.Axes = None, scatter_kwargs: dict = {}, colors: list = COLORS,
                          show: bool = True, title: Union[str, None] = None, xlabel: str = 't',
                          ylabel: str = r'$\alpha_{jt}$', fontsize: int = 18, ticklabelsize: int = 15) -> plt.Axes:
    """
    This function plots a scatter plot of the $ alpha_{jt} $ coefficients of all the events.
    Args:
        ax (matplotlib.pyplot.Axes, Optional): ax to use
        scatter_kwargs (dict, Optional): keywords to pass to the scatter function
        colors (list, Optional): colors names
        show (bool, Optional): if to use plt.show()
        title (str, Optional): axes title
        xlabel (str, Optional): axes xlabel
        ylabel (str, Optional): axes ylabel
        fontsize (int, Optional): axes title, xlabel, ylabel fontsize

    Returns:
        ax (matplotlib.pyplot.Axes): output figure
    """
    if ax is None:
        fig, ax = plt.subplots(1, 1)
    ax.tick_params(axis='both', which='major', labelsize=ticklabelsize)
    ax.tick_params(axis='both', which='minor', labelsize=ticklabelsize)
    title = r'$\alpha_{jt}$' + f' for all events' if title is None else title
    for idx, (event, model) in enumerate(self.event_models.items()):
        label = f'{event}'
        color = colors[idx % len(colors)]
        self.plot_event_alpha(event=event, ax=ax, scatter_kwargs=scatter_kwargs, show=False, title=title,
                              ylabel=ylabel, xlabel=xlabel, fontsize=fontsize, label=label, color=color,
                              ticklabelsize=ticklabelsize)
    ax.legend()
    if show:
        plt.show()
    return ax

plot_all_events_beta(self, ax=None, colors=['tab:blue', 'tab:orange', 'tab:green', 'tab:red', 'tab:purple', 'tab:brown', 'tab:pink', 'tab:gray', 'tab:olive', 'tab:cyan'], show=True, title=None, xlabel='Value', ylabel='$\\beta_{j}$', fontsize=18, ticklabelsize=15) ¤

This function plots the $ beta_{j} $ coefficients and standard errors of all the events.

Parameters:

Name Type Description Default
ax matplotlib.pyplot.Axes, Optional

ax to use

None
colors list, Optional

colors names

['tab:blue', 'tab:orange', 'tab:green', 'tab:red', 'tab:purple', 'tab:brown', 'tab:pink', 'tab:gray', 'tab:olive', 'tab:cyan']
show bool, Optional

if to use plt.show()

True
title str, Optional

axes title

None
xlabel str, Optional

axes xlabel

'Value'
ylabel str, Optional

axes ylabel

'$\\beta_{j}$'
fontsize int, Optional

axes title, xlabel, ylabel fontsize

18
ticklabelsize int, Optional

axes xticklabels, yticklabels fontsize

15

Returns:

Type Description
ax (matplotlib.pyplot.Axes)

output figure

Source code in pydts/fitters.py
def plot_all_events_beta(self, ax: plt.Axes = None, colors: list = COLORS, show: bool = True,
                         title: Union[str, None] = None, xlabel: str = 'Value',  ylabel: str = r'$\beta_{j}$',
                         fontsize: int = 18, ticklabelsize: int = 15) -> plt.Axes:
    """
    This function plots the $ beta_{j} $ coefficients and standard errors of all the events.
    Args:
        ax (matplotlib.pyplot.Axes, Optional): ax to use
        colors (list, Optional): colors names
        show (bool, Optional): if to use plt.show()
        title (str, Optional): axes title
        xlabel (str, Optional): axes xlabel
        ylabel (str, Optional): axes ylabel
        fontsize (int, Optional): axes title, xlabel, ylabel fontsize
        ticklabelsize (int, Optional): axes xticklabels, yticklabels fontsize
    Returns:
        ax (matplotlib.pyplot.Axes): output figure
    """
    if ax is None:
        fig, ax = plt.subplots(1, 1)
    title = r'$\beta_{j}$' + f' for all events' if title is None else title
    ax.tick_params(axis='both', which='major', labelsize=ticklabelsize)
    ax.tick_params(axis='both', which='minor', labelsize=ticklabelsize)
    se_df = self.get_beta_SE()

    for idx, col in enumerate(se_df.columns):
        if idx % 2 == 1:
            continue
        y = np.arange((idx//2)*len(se_df), (1+(idx//2))*len(se_df))
        ax.errorbar(x=se_df.iloc[:, idx].values, y=y,
                   color=colors[idx % len(colors)], xerr=se_df.iloc[:, idx+1].values, label=f'{col}',
                   markersize=6, ls='', marker='o')

    yt = list(se_df.index) * (len(se_df.columns) // 2)
    ax.set_yticks(np.arange(0, len(yt)))
    ax.set_yticklabels(yt)
    ax.set_title(title, fontsize=fontsize)
    ax.set_xlabel(xlabel, fontsize=fontsize)
    ax.set_ylabel(ylabel, fontsize=fontsize)
    ax.grid()
    plt.gca().invert_yaxis()
    ax.legend()
    if show:
        plt.show()
    return ax

plot_event_alpha(self, event, ax=None, scatter_kwargs={}, show=True, title=None, xlabel='t', ylabel='$\\alpha_{jt}$', fontsize=18, color=None, label=None, ticklabelsize=15) ¤

This function plots a scatter plot of the $ alpha_{jt} $ coefficients of a specific event.

Parameters:

Name Type Description Default
event Union[str, int]

event name

required
ax matplotlib.pyplot.Axes, Optional

ax to use

None
scatter_kwargs dict, Optional

keywords to pass to the scatter function

{}
show bool, Optional

if to use plt.show()

True
title str, Optional

axes title

None
xlabel str, Optional

axes xlabel

't'
ylabel str, Optional

axes ylabel

'$\\alpha_{jt}$'
fontsize int, Optional

axes title, xlabel, ylabel fontsize

18
color str, Optional

color name to use

None
label str, Optional

label name

None

Returns:

Type Description
ax (matplotlib.pyplot.Axes)

output figure

Source code in pydts/fitters.py
def plot_event_alpha(self, event: Union[str, int], ax: plt.Axes = None, scatter_kwargs: dict = {},
                     show=True, title=None, xlabel='t', ylabel=r'$\alpha_{jt}$', fontsize=18,
                     color: str = None, label: str = None, ticklabelsize: int = 15) -> plt.Axes:
    """
    This function plots a scatter plot of the $ alpha_{jt} $ coefficients of a specific event.
    Args:
        event (Union[str, int]): event name
        ax (matplotlib.pyplot.Axes, Optional): ax to use
        scatter_kwargs (dict, Optional): keywords to pass to the scatter function
        show (bool, Optional): if to use plt.show()
        title (str, Optional): axes title
        xlabel (str, Optional): axes xlabel
        ylabel (str, Optional): axes ylabel
        fontsize (int, Optional): axes title, xlabel, ylabel fontsize
        color (str, Optional): color name to use
        label (str, Optional): label name
    Returns:
        ax (matplotlib.pyplot.Axes): output figure
    """

    assert event in self.events, f"Cannot plot event {event} alpha - it was not included during .fit()"

    if ax is None:
        fig, ax = plt.subplots(1, 1)
    ax.tick_params(axis='both', which='major', labelsize=ticklabelsize)
    ax.tick_params(axis='both', which='minor', labelsize=ticklabelsize)
    title = r'$\alpha_{jt}$' + f' for event {event}' if title is None else title
    label = f'{event}' if label is None else label
    color = 'tab:blue' if color is None else color
    alpha_df = self.event_models[event][1]
    ax.scatter(alpha_df[self.duration_col].values, alpha_df['alpha_jt'].values, label=label,
               color=color, **scatter_kwargs)
    ax.set_title(title, fontsize=fontsize)
    ax.set_xlabel(xlabel, fontsize=fontsize)
    ax.set_ylabel(ylabel, fontsize=fontsize)
    if show:
        plt.show()
    return ax

predict_hazard_jt(self, df, event, t) ¤

This method calculates the hazard for the given event at the given time values if they were included in the training set of the event.

Parameters:

Name Type Description Default
df pd.DataFrame

samples to predict for

required
event Union[str, int]

event name

required
t Union[Iterable, int]

times to calculate the hazard for

required

Returns:

Type Description
df (pd.DataFrame)

samples with the prediction columns

Source code in pydts/fitters.py
def predict_hazard_jt(self,
                      df: pd.DataFrame,
                      event: Union[str, int],
                      t: Union[Iterable, int]) -> pd.DataFrame:
    """
    This method calculates the hazard for the given event at the given time values if they were included in
    the training set of the event.

    Args:
        df (pd.DataFrame): samples to predict for
        event (Union[str, int]): event name
        t (Union[Iterable, int]): times to calculate the hazard for

    Returns:
        df (pd.DataFrame): samples with the prediction columns
    """
    self._validate_covariates_in_df(df.head())
    t = self._validate_t(t, return_iter=True)
    assert event in self.events, \
        f"Cannot predict for event {event} - it was not included during .fit()"

    model = self.event_models[event]
    alpha_df = model[1].set_index(self.duration_col)['alpha_jt'].copy()

    _t = np.array([t_i for t_i in t if (f'hazard_j{event}_t{t_i}' not in df.columns)])
    if len(_t) == 0:
        return df
    temp_df = df.copy()
    if isinstance(self.covariates, list):
        beta_j_x = temp_df[self.covariates].dot(getattr(model[0], self.beta_models_params_attr))
    elif isinstance(self.covariates, dict):
        beta_j_x = temp_df[self.covariates[event]].dot(getattr(model[0], self.beta_models_params_attr))
    temp_df[[f'hazard_j{event}_t{c}' for c in _t]] = pd.concat(
        [self._hazard_inverse_transformation(alpha_df[c] + beta_j_x) for c in _t], axis=1).values
    return temp_df

print_summary(self, summary_func='print_summary', summary_kwargs={}) ¤

This method prints the summary of the fitted models for all the events.

Parameters:

Name Type Description Default
summary_func str, Optional

print summary method of the fitted model type ("summary", "print_summary").

'print_summary'
summary_kwargs dict, Optional

Keyword arguments to pass to the model summary function.

{}

Returns:

Type Description
None

None

Source code in pydts/fitters.py
def print_summary(self,
                  summary_func: str = "print_summary",
                  summary_kwargs: dict = {}) -> None:
    """
    This method prints the summary of the fitted models for all the events.

    Args:
        summary_func (str, Optional): print summary method of the fitted model type ("summary", "print_summary").
        summary_kwargs (dict, Optional): Keyword arguments to pass to the model summary function.

    Returns:
        None
    """
    from IPython.display import display
    display(self.get_beta_SE())

    for event, model in self.event_models.items():
        print(f'\n\nModel summary for event: {event}')
        display(model[1].set_index([self.event_type_col, self.duration_col]))