Module src.models.utils

Expand source code
# import numpy as np
# from sklearn.compose import TransformedTargetRegressor
# from sklearn.multioutput import MultiOutputRegressor
# from sklearn.decomposition import PCA
# from sklearn.utils._joblib import Parallel, delayed
# from sklearn.utils import check_array
# from sklearn.utils.fixes import parallel_helper
# from sklearn.utils.validation import check_is_fitted
# from sklearn.utils import gen_batches



# class PCATargetTransform(TransformedTargetRegressor):
#     def __init__(self, regressor=None, n_components=10):

#         # PCA transform
#         transformer = PCA(n_components=n_components)

#         # Initialize Target Transform with PCA
#         super(PCATargetTransform, self).__init__(
#             regressor=regressor,
#             transformer=transformer,
#             func=None,
#             inverse_func=None,
#             check_inverse=False,
#         )
#         self.n_components = n_components

#     def predict(self, X, return_std=False):
#         """Predict using the base regressor, applying inverse.
#         The regressor is used to predict and the ``inverse_func`` or
#         ``inverse_transform`` is applied before returning the prediction.
#         Parameters
#         ----------
#         X : {array-like, sparse matrix}, shape = (n_samples, n_features)
#             Samples.
#         Returns
#         -------
#         y_hat : array, shape = (n_samples,)
#             Predicted values.
#         """
#         check_is_fitted(self, "regressor_")

#         # Return standard deviation
#         if return_std:
#             try:
#                 pred, std = self.regressor_.predict(X, return_std=return_std)
#             except:
#                 pred = self.regressor_.predict(X)
#         else:
#             pred = self.regressor_.predict(X)

#         # Inverse Transformation
#         if pred.ndim == 1:
#             pred_trans = self.transformer_.inverse_transform(pred.reshape(-1, 1))
#         else:
#             pred_trans = self.transformer_.inverse_transform(pred)

#         # Checks
#         if (
#             self._training_dim == 1
#             and pred_trans.ndim == 2
#             and pred_trans.shape[1] == 1
#         ):
#             pred_trans = pred_trans.squeeze(axis=1)

#         if return_std:
#             return pred_trans, std
#         else:
#             return pred_trans


# class MultiTaskGP(MultiOutputRegressor):
#     def predict(self, X, return_std=False):
#         """Predict multi-output variable using a model
#          trained for each target variable.
#         Parameters
#         ----------
#         X : (sparse) array-like, shape (n_samples, n_features)
#             Data.
#         Returns
#         -------
#         y : (sparse) array-like, shape (n_samples, n_outputs)
#             Multi-output targets predicted across multiple predictors.
#             Note: Separate models are generated for each predictor.
#         """
#         check_is_fitted(self, "estimators_")
#         if not hasattr(self.estimator, "predict"):
#             raise ValueError("The base estimator should implement" " a predict method")

#         X = check_array(X, accept_sparse=True)

#         if not return_std:
#             preds = Parallel(n_jobs=self.n_jobs)(
#                 delayed(parallel_helper)(e, "predict", X, return_std=False)
#                 for e in self.estimators_
#             )

#             return np.asarray(preds).T
#         else:
#             results = Parallel(n_jobs=self.n_jobs)(
#                 delayed(parallel_helper)(e, "predict", X, return_std=True)
#                 for e in self.estimators_
#             )
#             preds, stds = tuple(zip(*results))
#             # print(preds.shape, stds.shape)
#             return np.asarray(preds).T, np.asarray(stds).T


# # def batch_predict(Xs, model, batchsize=1000):
# #     ms, vs = [], []
# #     n = max(len(Xs) / batchsize, 1)  # predict in small batches
# #     for xs in np.array_split(Xs, n):
# #         m, v = model.predict(xs)

# #         ms.append(m)
# #         vs.append(v)

# #     return (
# #         np.concatenate(ms, 1),
# #         np.concatenate(vs, 1),
# #     )  # num_posterior_samples, N_test, D_y


# def batch_predict(X, model, n_jobs=2, batch_size=100, verbose=1):

#     X = check_array(X, accept_sparse=False)

#     if n_jobs == 1:
#         ms = []
#         for (start, end) in gen_batches(X.shape[0], batch_size=batch_size):
#             m = model.predict(X)
#             ms.append(m)
#         return np.concatenate(ms, 1)
#     elif n_jobs > 1:
#         # print("return std")
#         results = Parallel(n_jobs=n_jobs, verbose=verbose)(
#             delayed(parallel_helper)(model, "predict", X[start:end])
#             for (start, end) in gen_batches(X.shape[0], batch_size=batch_size)
#         )
#         preds = tuple(zip(*results))

#         return np.asarray(preds).T
#     else:
#         raise ValueError(f"n_jobs '{n_jobs}' should greater than or equal to 1.")


# def batch_predict_bayesian(X, model, n_jobs=2, batch_size=100, verbose=1):

#     X = check_array(X, accept_sparse=False)

#     if n_jobs == 1:
#         ms, vs = [], []
#         for (start, end) in gen_batches(X.shape[0], batch_size=batch_size):
#             m, v = model.predict(X, return_std=True)
#             ms.append(m)
#             vs.append(v)
#         return np.concatenate(ms, 1), np.concatenate(vs, 1)
#     elif n_jobs > 1:
#         # print("return std")
#         results = Parallel(n_jobs=n_jobs, verbose=verbose)(
#             delayed(parallel_helper)(model, "predict", X[start:end], return_std=True)
#             for (start, end) in gen_batches(X.shape[0], batch_size=batch_size)
#         )
#         preds, stds = tuple(zip(*results))

#         return np.asarray(preds).T, np.asarray(stds).T
#     else:
#         raise ValueError(f"n_jobs '{n_jobs}' should greater than or equal to 1.")


# #     # Perform parallel predictions using joblib
# #     results = Parallel(n_jobs=n_jobs, verbose=verbose)(
# #         delayed(gp_model_predictions)(
# #             gp_model, x[start:end],
# #             return_variance=return_variance,)
# #         for (start, end) in generate_batches(x.shape[0], batch_size=batch_size)

# #     )

# #     # Aggregate results (predictions, derivatives, variances)
# #     predictions, variance = tuple(zip(*results))
# #     predictions = np.hstack(predictions)
# #     variance = np.hstack(variance)


# #     if return_variance and return_derivative:
# #         return predictions[:, np.newaxis], derivative, variance
# #     elif return_variance:
# #         return predictions[:, np.newaxis], variance
# #     elif return_derivative:
# #         return predictions[:, np.newaxis], derivative
# #     else:
# # return predictions[:, np.newaxis]