import warnings from numbers import Integral, Real from warnings import warn import numpy as np from ..base import ( BaseEstimator, ClassifierMixin, MetaEstimatorMixin, _fit_context, clone, ) from ..utils import Bunch, get_tags, safe_mask from ..utils._param_validation import HasMethods, Hidden, Interval, StrOptions from ..utils.metadata_routing import ( MetadataRouter, MethodMapping, _raise_for_params, _routing_enabled, process_routing, ) from ..utils.metaestimators import available_if from ..utils.validation import _estimator_has, check_is_fitted, validate_data __all__ = ["SelfTrainingClassifier"] # Authors: The scikit-learn developers # SPDX-License-Identifier: BSD-3-Clause class SelfTrainingClassifier(ClassifierMixin, MetaEstimatorMixin, BaseEstimator): """Self-training classifier. This :term:`metaestimator` allows a given supervised classifier to function as a semi-supervised classifier, allowing it to learn from unlabeled data. It does this by iteratively predicting pseudo-labels for the unlabeled data and adding them to the training set. The classifier will continue iterating until either max_iter is reached, or no pseudo-labels were added to the training set in the previous iteration. Read more in the :ref:`User Guide `. Parameters ---------- estimator : estimator object An estimator object implementing `fit` and `predict_proba`. Invoking the `fit` method will fit a clone of the passed estimator, which will be stored in the `estimator_` attribute. .. versionadded:: 1.6 `estimator` was added to replace `base_estimator`. base_estimator : estimator object An estimator object implementing `fit` and `predict_proba`. Invoking the `fit` method will fit a clone of the passed estimator, which will be stored in the `estimator_` attribute. .. deprecated:: 1.6 `base_estimator` was deprecated in 1.6 and will be removed in 1.8. Use `estimator` instead. threshold : float, default=0.75 The decision threshold for use with `criterion='threshold'`. Should be in [0, 1). When using the `'threshold'` criterion, a :ref:`well calibrated classifier ` should be used. criterion : {'threshold', 'k_best'}, default='threshold' The selection criterion used to select which labels to add to the training set. If `'threshold'`, pseudo-labels with prediction probabilities above `threshold` are added to the dataset. If `'k_best'`, the `k_best` pseudo-labels with highest prediction probabilities are added to the dataset. When using the 'threshold' criterion, a :ref:`well calibrated classifier ` should be used. k_best : int, default=10 The amount of samples to add in each iteration. Only used when `criterion='k_best'`. max_iter : int or None, default=10 Maximum number of iterations allowed. Should be greater than or equal to 0. If it is `None`, the classifier will continue to predict labels until no new pseudo-labels are added, or all unlabeled samples have been labeled. verbose : bool, default=False Enable verbose output. Attributes ---------- estimator_ : estimator object The fitted estimator. classes_ : ndarray or list of ndarray of shape (n_classes,) Class labels for each output. (Taken from the trained `estimator_`). transduction_ : ndarray of shape (n_samples,) The labels used for the final fit of the classifier, including pseudo-labels added during fit. labeled_iter_ : ndarray of shape (n_samples,) The iteration in which each sample was labeled. When a sample has iteration 0, the sample was already labeled in the original dataset. When a sample has iteration -1, the sample was not labeled in any iteration. n_features_in_ : int Number of features seen during :term:`fit`. .. versionadded:: 0.24 feature_names_in_ : ndarray of shape (`n_features_in_`,) Names of features seen during :term:`fit`. Defined only when `X` has feature names that are all strings. .. versionadded:: 1.0 n_iter_ : int The number of rounds of self-training, that is the number of times the base estimator is fitted on relabeled variants of the training set. termination_condition_ : {'max_iter', 'no_change', 'all_labeled'} The reason that fitting was stopped. - `'max_iter'`: `n_iter_` reached `max_iter`. - `'no_change'`: no new labels were predicted. - `'all_labeled'`: all unlabeled samples were labeled before `max_iter` was reached. See Also -------- LabelPropagation : Label propagation classifier. LabelSpreading : Label spreading model for semi-supervised learning. References ---------- :doi:`David Yarowsky. 1995. Unsupervised word sense disambiguation rivaling supervised methods. In Proceedings of the 33rd annual meeting on Association for Computational Linguistics (ACL '95). Association for Computational Linguistics, Stroudsburg, PA, USA, 189-196. <10.3115/981658.981684>` Examples -------- >>> import numpy as np >>> from sklearn import datasets >>> from sklearn.semi_supervised import SelfTrainingClassifier >>> from sklearn.svm import SVC >>> rng = np.random.RandomState(42) >>> iris = datasets.load_iris() >>> random_unlabeled_points = rng.rand(iris.target.shape[0]) < 0.3 >>> iris.target[random_unlabeled_points] = -1 >>> svc = SVC(probability=True, gamma="auto") >>> self_training_model = SelfTrainingClassifier(svc) >>> self_training_model.fit(iris.data, iris.target) SelfTrainingClassifier(...) """ _parameter_constraints: dict = { # We don't require `predic_proba` here to allow passing a meta-estimator # that only exposes `predict_proba` after fitting. # TODO(1.8) remove None option "estimator": [None, HasMethods(["fit"])], # TODO(1.8) remove "base_estimator": [ HasMethods(["fit"]), Hidden(StrOptions({"deprecated"})), ], "threshold": [Interval(Real, 0.0, 1.0, closed="left")], "criterion": [StrOptions({"threshold", "k_best"})], "k_best": [Interval(Integral, 1, None, closed="left")], "max_iter": [Interval(Integral, 0, None, closed="left"), None], "verbose": ["verbose"], } def __init__( self, estimator=None, base_estimator="deprecated", threshold=0.75, criterion="threshold", k_best=10, max_iter=10, verbose=False, ): self.estimator = estimator self.threshold = threshold self.criterion = criterion self.k_best = k_best self.max_iter = max_iter self.verbose = verbose # TODO(1.8) remove self.base_estimator = base_estimator def _get_estimator(self): """Get the estimator. Returns ------- estimator_ : estimator object The cloned estimator object. """ # TODO(1.8): remove and only keep clone(self.estimator) if self.estimator is None and self.base_estimator != "deprecated": estimator_ = clone(self.base_estimator) warn( ( "`base_estimator` has been deprecated in 1.6 and will be removed" " in 1.8. Please use `estimator` instead." ), FutureWarning, ) # TODO(1.8) remove elif self.estimator is None and self.base_estimator == "deprecated": raise ValueError( "You must pass an estimator to SelfTrainingClassifier. Use `estimator`." ) elif self.estimator is not None and self.base_estimator != "deprecated": raise ValueError( "You must pass only one estimator to SelfTrainingClassifier." " Use `estimator`." ) else: estimator_ = clone(self.estimator) return estimator_ @_fit_context( # SelfTrainingClassifier.estimator is not validated yet prefer_skip_nested_validation=False ) def fit(self, X, y, **params): """ Fit self-training classifier using `X`, `y` as training data. Parameters ---------- X : {array-like, sparse matrix} of shape (n_samples, n_features) Array representing the data. y : {array-like, sparse matrix} of shape (n_samples,) Array representing the labels. Unlabeled samples should have the label -1. **params : dict Parameters to pass to the underlying estimators. .. versionadded:: 1.6 Only available if `enable_metadata_routing=True`, which can be set by using ``sklearn.set_config(enable_metadata_routing=True)``. See :ref:`Metadata Routing User Guide ` for more details. Returns ------- self : object Fitted estimator. """ _raise_for_params(params, self, "fit") self.estimator_ = self._get_estimator() # we need row slicing support for sparse matrices, but costly finiteness check # can be delegated to the base estimator. X, y = validate_data( self, X, y, accept_sparse=["csr", "csc", "lil", "dok"], ensure_all_finite=False, ) if y.dtype.kind in ["U", "S"]: raise ValueError( "y has dtype string. If you wish to predict on " "string targets, use dtype object, and use -1" " as the label for unlabeled samples." ) has_label = y != -1 if np.all(has_label): warnings.warn("y contains no unlabeled samples", UserWarning) if self.criterion == "k_best" and ( self.k_best > X.shape[0] - np.sum(has_label) ): warnings.warn( ( "k_best is larger than the amount of unlabeled " "samples. All unlabeled samples will be labeled in " "the first iteration" ), UserWarning, ) if _routing_enabled(): routed_params = process_routing(self, "fit", **params) else: routed_params = Bunch(estimator=Bunch(fit={})) self.transduction_ = np.copy(y) self.labeled_iter_ = np.full_like(y, -1) self.labeled_iter_[has_label] = 0 self.n_iter_ = 0 while not np.all(has_label) and ( self.max_iter is None or self.n_iter_ < self.max_iter ): self.n_iter_ += 1 self.estimator_.fit( X[safe_mask(X, has_label)], self.transduction_[has_label], **routed_params.estimator.fit, ) # Predict on the unlabeled samples prob = self.estimator_.predict_proba(X[safe_mask(X, ~has_label)]) pred = self.estimator_.classes_[np.argmax(prob, axis=1)] max_proba = np.max(prob, axis=1) # Select new labeled samples if self.criterion == "threshold": selected = max_proba > self.threshold else: n_to_select = min(self.k_best, max_proba.shape[0]) if n_to_select == max_proba.shape[0]: selected = np.ones_like(max_proba, dtype=bool) else: # NB these are indices, not a mask selected = np.argpartition(-max_proba, n_to_select)[:n_to_select] # Map selected indices into original array selected_full = np.nonzero(~has_label)[0][selected] # Add newly labeled confident predictions to the dataset self.transduction_[selected_full] = pred[selected] has_label[selected_full] = True self.labeled_iter_[selected_full] = self.n_iter_ if selected_full.shape[0] == 0: # no changed labels self.termination_condition_ = "no_change" break if self.verbose: print( f"End of iteration {self.n_iter_}," f" added {selected_full.shape[0]} new labels." ) if self.n_iter_ == self.max_iter: self.termination_condition_ = "max_iter" if np.all(has_label): self.termination_condition_ = "all_labeled" self.estimator_.fit( X[safe_mask(X, has_label)], self.transduction_[has_label], **routed_params.estimator.fit, ) self.classes_ = self.estimator_.classes_ return self @available_if(_estimator_has("predict")) def predict(self, X, **params): """Predict the classes of `X`. Parameters ---------- X : {array-like, sparse matrix} of shape (n_samples, n_features) Array representing the data. **params : dict of str -> object Parameters to pass to the underlying estimator's ``predict`` method. .. versionadded:: 1.6 Only available if `enable_metadata_routing=True`, which can be set by using ``sklearn.set_config(enable_metadata_routing=True)``. See :ref:`Metadata Routing User Guide ` for more details. Returns ------- y : ndarray of shape (n_samples,) Array with predicted labels. """ check_is_fitted(self) _raise_for_params(params, self, "predict") if _routing_enabled(): # metadata routing is enabled. routed_params = process_routing(self, "predict", **params) else: routed_params = Bunch(estimator=Bunch(predict={})) X = validate_data( self, X, accept_sparse=True, ensure_all_finite=False, reset=False, ) return self.estimator_.predict(X, **routed_params.estimator.predict) @available_if(_estimator_has("predict_proba")) def predict_proba(self, X, **params): """Predict probability for each possible outcome. Parameters ---------- X : {array-like, sparse matrix} of shape (n_samples, n_features) Array representing the data. **params : dict of str -> object Parameters to pass to the underlying estimator's ``predict_proba`` method. .. versionadded:: 1.6 Only available if `enable_metadata_routing=True`, which can be set by using ``sklearn.set_config(enable_metadata_routing=True)``. See :ref:`Metadata Routing User Guide ` for more details. Returns ------- y : ndarray of shape (n_samples, n_features) Array with prediction probabilities. """ check_is_fitted(self) _raise_for_params(params, self, "predict_proba") if _routing_enabled(): # metadata routing is enabled. routed_params = process_routing(self, "predict_proba", **params) else: routed_params = Bunch(estimator=Bunch(predict_proba={})) X = validate_data( self, X, accept_sparse=True, ensure_all_finite=False, reset=False, ) return self.estimator_.predict_proba(X, **routed_params.estimator.predict_proba) @available_if(_estimator_has("decision_function")) def decision_function(self, X, **params): """Call decision function of the `estimator`. Parameters ---------- X : {array-like, sparse matrix} of shape (n_samples, n_features) Array representing the data. **params : dict of str -> object Parameters to pass to the underlying estimator's ``decision_function`` method. .. versionadded:: 1.6 Only available if `enable_metadata_routing=True`, which can be set by using ``sklearn.set_config(enable_metadata_routing=True)``. See :ref:`Metadata Routing User Guide ` for more details. Returns ------- y : ndarray of shape (n_samples, n_features) Result of the decision function of the `estimator`. """ check_is_fitted(self) _raise_for_params(params, self, "decision_function") if _routing_enabled(): # metadata routing is enabled. routed_params = process_routing(self, "decision_function", **params) else: routed_params = Bunch(estimator=Bunch(decision_function={})) X = validate_data( self, X, accept_sparse=True, ensure_all_finite=False, reset=False, ) return self.estimator_.decision_function( X, **routed_params.estimator.decision_function ) @available_if(_estimator_has("predict_log_proba")) def predict_log_proba(self, X, **params): """Predict log probability for each possible outcome. Parameters ---------- X : {array-like, sparse matrix} of shape (n_samples, n_features) Array representing the data. **params : dict of str -> object Parameters to pass to the underlying estimator's ``predict_log_proba`` method. .. versionadded:: 1.6 Only available if `enable_metadata_routing=True`, which can be set by using ``sklearn.set_config(enable_metadata_routing=True)``. See :ref:`Metadata Routing User Guide ` for more details. Returns ------- y : ndarray of shape (n_samples, n_features) Array with log prediction probabilities. """ check_is_fitted(self) _raise_for_params(params, self, "predict_log_proba") if _routing_enabled(): # metadata routing is enabled. routed_params = process_routing(self, "predict_log_proba", **params) else: routed_params = Bunch(estimator=Bunch(predict_log_proba={})) X = validate_data( self, X, accept_sparse=True, ensure_all_finite=False, reset=False, ) return self.estimator_.predict_log_proba( X, **routed_params.estimator.predict_log_proba ) @available_if(_estimator_has("score")) def score(self, X, y, **params): """Call score on the `estimator`. Parameters ---------- X : {array-like, sparse matrix} of shape (n_samples, n_features) Array representing the data. y : array-like of shape (n_samples,) Array representing the labels. **params : dict of str -> object Parameters to pass to the underlying estimator's ``score`` method. .. versionadded:: 1.6 Only available if `enable_metadata_routing=True`, which can be set by using ``sklearn.set_config(enable_metadata_routing=True)``. See :ref:`Metadata Routing User Guide ` for more details. Returns ------- score : float Result of calling score on the `estimator`. """ check_is_fitted(self) _raise_for_params(params, self, "score") if _routing_enabled(): # metadata routing is enabled. routed_params = process_routing(self, "score", **params) else: routed_params = Bunch(estimator=Bunch(score={})) X = validate_data( self, X, accept_sparse=True, ensure_all_finite=False, reset=False, ) return self.estimator_.score(X, y, **routed_params.estimator.score) def get_metadata_routing(self): """Get metadata routing of this object. Please check :ref:`User Guide ` on how the routing mechanism works. .. versionadded:: 1.6 Returns ------- routing : MetadataRouter A :class:`~sklearn.utils.metadata_routing.MetadataRouter` encapsulating routing information. """ router = MetadataRouter(owner=self.__class__.__name__) router.add( estimator=self.estimator, method_mapping=( MethodMapping() .add(callee="fit", caller="fit") .add(callee="score", caller="fit") .add(callee="predict", caller="predict") .add(callee="predict_proba", caller="predict_proba") .add(callee="decision_function", caller="decision_function") .add(callee="predict_log_proba", caller="predict_log_proba") .add(callee="score", caller="score") ), ) return router def __sklearn_tags__(self): tags = super().__sklearn_tags__() # TODO(1.8): remove the condition check together with base_estimator if self.estimator is not None: tags.input_tags.sparse = get_tags(self.estimator).input_tags.sparse return tags