# coding=utf-8 # Copyright 2024 The HuggingFace Inc. team. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ Processor class for LLaVa-NeXT. """ from typing import Optional, Union import numpy as np from ...feature_extraction_utils import BatchFeature from ...image_processing_utils import select_best_resolution from ...image_utils import ImageInput, get_image_size, to_numpy_array from ...processing_utils import ( MultiModalData, ProcessingKwargs, ProcessorMixin, Unpack, ) from ...tokenization_utils_base import PreTokenizedInput, TextInput from ...utils import logging logger = logging.get_logger(__name__) class LlavaNextProcessorKwargs(ProcessingKwargs, total=False): _defaults = { "text_kwargs": { "padding": False, "return_mm_token_type_ids": False, }, "images_kwargs": { "do_pad": True, }, } class LlavaNextProcessor(ProcessorMixin): r""" Constructs a LLaVa-NeXT processor which wraps a LLaVa-NeXT image processor and a LLaMa tokenizer into a single processor. [`LlavaNextProcessor`] offers all the functionalities of [`LlavaNextImageProcessor`] and [`LlamaTokenizerFast`]. See the [`~LlavaNextProcessor.__call__`] and [`~LlavaNextProcessor.decode`] for more information. Args: image_processor ([`LlavaNextImageProcessor`], *optional*): The image processor is a required input. tokenizer ([`LlamaTokenizerFast`], *optional*): The tokenizer is a required input. patch_size (`int`, *optional*): Patch size from the vision tower. vision_feature_select_strategy (`str`, *optional*): The feature selection strategy used to select the vision feature from the vision backbone. Should be same as in model's config chat_template (`str`, *optional*): A Jinja template which will be used to convert lists of messages in a chat into a tokenizable string. image_token (`str`, *optional*, defaults to `""`): Special token used to denote image location. num_additional_image_tokens (`int`, *optional*, defaults to 0): Number of additional tokens added to the image embeddings, such as CLS (+1). If the backbone has no CLS or other extra tokens appended, no need to set this arg. """ attributes = ["image_processor", "tokenizer"] image_processor_class = "AutoImageProcessor" tokenizer_class = "AutoTokenizer" def __init__( self, image_processor=None, tokenizer=None, patch_size=None, vision_feature_select_strategy=None, chat_template=None, image_token="", # set the default and let users change if they have peculiar special tokens in rare cases num_additional_image_tokens=0, **kwargs, ): self.patch_size = patch_size self.num_additional_image_tokens = num_additional_image_tokens self.vision_feature_select_strategy = vision_feature_select_strategy self.image_token = tokenizer.image_token if hasattr(tokenizer, "image_token") else image_token self.image_token_id = ( tokenizer.image_token_id if getattr(tokenizer, "image_token_id", None) else tokenizer.convert_tokens_to_ids(self.image_token) ) super().__init__(image_processor, tokenizer, chat_template=chat_template) def __call__( self, images: Optional[ImageInput] = None, text: Union[TextInput, PreTokenizedInput, list[TextInput], list[PreTokenizedInput]] = None, audio=None, videos=None, **kwargs: Unpack[LlavaNextProcessorKwargs], ) -> BatchFeature: """ Main method to prepare for the model one or several sequences(s) and image(s). This method forwards the `text` and `kwargs` arguments to LlamaTokenizerFast's [`~LlamaTokenizerFast.__call__`] if `text` is not `None` to encode the text. To prepare the image(s), this method forwards the `images` and `kwargs` arguments to LlavaNextImageProcessor's [`~LlavaNextImageProcessor.__call__`] if `images` is not `None`. Please refer to the docstring of the above two methods for more information. Args: images (`PIL.Image.Image`, `np.ndarray`, `torch.Tensor`, `list[PIL.Image.Image]`, `list[np.ndarray]`, `list[torch.Tensor]`): The image or batch of images to be prepared. Each image can be a PIL image, NumPy array or PyTorch tensor. Both channels-first and channels-last formats are supported. text (`str`, `list[str]`, `list[list[str]]`): The sequence or batch of sequences to be encoded. Each sequence can be a string or a list of strings (pretokenized string). If the sequences are provided as list of strings (pretokenized), you must set `is_split_into_words=True` (to lift the ambiguity with a batch of sequences). Returns: [`BatchFeature`]: A [`BatchFeature`] with the following fields: - **input_ids** -- List of token ids to be fed to a model. Returned when `text` is not `None`. - **attention_mask** -- List of indices specifying which tokens should be attended to by the model (when `return_attention_mask=True` or if *"attention_mask"* is in `self.model_input_names` and if `text` is not `None`). - **pixel_values** -- Pixel values to be fed to a model. Returned when `images` is not `None`. """ if images is None and text is None: raise ValueError("You have to specify at least images or text.") output_kwargs = self._merge_kwargs( LlavaNextProcessorKwargs, tokenizer_init_kwargs=self.tokenizer.init_kwargs, **kwargs, ) if images is not None: image_inputs = self.image_processor(images, **output_kwargs["images_kwargs"]) else: image_inputs = {} if isinstance(text, str): text = [text] elif not isinstance(text, list) and not isinstance(text[0], str): raise TypeError("Invalid input text. Please provide a string, or a list of strings") prompt_strings = text if image_inputs: image_sizes = iter(image_inputs["image_sizes"]) height, width = get_image_size(to_numpy_array(image_inputs["pixel_values"][0][0])) prompt_strings = [] for sample in text: while self.image_token in sample: image_size = next(image_sizes) if not isinstance(image_size, (list, tuple)): # cast to list to avoid numerical precision errors when calculating unpadding image_size = image_size.tolist() orig_height, orig_width = image_size num_image_tokens = self._get_number_of_features(orig_height, orig_width, height, width) if self.vision_feature_select_strategy == "default": num_image_tokens -= 1 sample = sample.replace(self.image_token, "" * num_image_tokens, 1) prompt_strings.append(sample) prompt_strings = [sample.replace("", self.image_token) for sample in prompt_strings] return_tensors = output_kwargs["text_kwargs"].pop("return_tensors", None) return_mm_token_type_ids = output_kwargs["text_kwargs"].pop("return_mm_token_type_ids", None) text_inputs = self.tokenizer(prompt_strings, **output_kwargs["text_kwargs"]) self._check_special_mm_tokens(prompt_strings, text_inputs, modalities=["image"]) if return_mm_token_type_ids: array_ids = np.array(text_inputs["input_ids"]) mm_token_type_ids = np.zeros_like(text_inputs["input_ids"]) mm_token_type_ids[array_ids == self.image_token_id] = 1 text_inputs["mm_token_type_ids"] = mm_token_type_ids.tolist() return BatchFeature(data={**text_inputs, **image_inputs}, tensor_type=return_tensors) def _get_number_of_features(self, orig_height: int, orig_width: int, height: int, width: int) -> int: image_grid_pinpoints = self.image_processor.image_grid_pinpoints height_best_resolution, width_best_resolution = select_best_resolution( [orig_height, orig_width], image_grid_pinpoints ) scale_height, scale_width = height_best_resolution // height, width_best_resolution // width patches_height = height // self.patch_size patches_width = width // self.patch_size unpadded_features, newline_features = self._get_unpadded_features( orig_height, orig_width, patches_height, patches_width, scale_height, scale_width ) # The base patch covers the entire image (+1 for the CLS) base_features = patches_height * patches_width + self.num_additional_image_tokens num_image_tokens = unpadded_features + newline_features + base_features return num_image_tokens def _get_unpadded_features(self, height, width, patches_height, patches_width, scale_height, scale_width): """ Get number of features for a given image with height/width. LLaVA-NeXT is different from LLaVA because it divided each image into patches depending on its resolution. Therefore we need to calculate how many patches an image is divided into and get the number of features from that. """ current_height = patches_height * scale_height current_width = patches_width * scale_width original_aspect_ratio = width / height current_aspect_ratio = current_width / current_height if original_aspect_ratio > current_aspect_ratio: new_height = int(round(height * (current_width / width), 7)) padding = (current_height - new_height) // 2 current_height -= padding * 2 else: new_width = int(round(width * (current_height / height), 7)) padding = (current_width - new_width) // 2 current_width -= padding * 2 unpadded_features = current_height * current_width newline_features = current_height return (unpadded_features, newline_features) def _get_num_multimodal_tokens(self, image_sizes=None, **kwargs): """ Computes the number of placeholder tokens needed for multimodal inputs with the given sizes. Args: image_sizes (list[list[str]], *optional*): The input sizes formatted as (height, width) per each image. Returns: `MultiModalData`: A `MultiModalData` object holding number of tokens per each of the provided input modalities, along with other useful data. """ vision_data = {} if image_sizes is not None: images_kwargs = LlavaNextProcessorKwargs._defaults.get("images_kwargs", {}) images_kwargs.update(kwargs) size = images_kwargs.get("size", None) or self.image_processor.size size = ( (size["shortest_edge"], size["shortest_edge"]) if "shortest_edge" in size else (min(size["height"], size["width"]), min(size["height"], size["width"])) ) processed_height, processed_width = size batch_num_image_tokens = [] num_image_patches = [1] * len(image_sizes) # llava-next doesn't batch pixels as Idefics, thus `1` patch` for image_size in image_sizes: orig_height, orig_width = image_size num_image_tokens = self._get_number_of_features( orig_height, orig_width, processed_height, processed_width ) if self.vision_feature_select_strategy == "default": num_image_tokens -= 1 batch_num_image_tokens.append(num_image_tokens) vision_data.update({"num_image_tokens": batch_num_image_tokens, "num_image_patches": num_image_patches}) return MultiModalData(**vision_data) __all__ = ["LlavaNextProcessor"]