# coding=utf-8 # Copyright 2024 the HuggingFace Inc. team. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """PyTorch Qwen2Audio model.""" import math from dataclasses import dataclass from typing import Callable, Optional, Union import torch from torch import nn from ...activations import ACT2FN from ...cache_utils import Cache from ...generation import GenerationMixin from ...modeling_layers import GradientCheckpointingLayer from ...modeling_outputs import BaseModelOutput, ModelOutput from ...modeling_utils import ALL_ATTENTION_FUNCTIONS, PreTrainedModel from ...utils import auto_docstring, logging from ..auto import AutoModel, AutoModelForCausalLM from .configuration_qwen2_audio import Qwen2AudioConfig, Qwen2AudioEncoderConfig logger = logging.get_logger(__name__) @dataclass @auto_docstring( custom_intro=""" Base class for Qwen2Audio causal language model (or autoregressive) outputs. """ ) class Qwen2AudioCausalLMOutputWithPast(ModelOutput): r""" loss (`torch.FloatTensor` of shape `(1,)`, *optional*, returned when `labels` is provided): Language modeling loss (for next-token prediction). logits (`torch.FloatTensor` of shape `(batch_size, sequence_length, config.vocab_size)`): Prediction scores of the language modeling head (scores for each vocabulary token before SoftMax). past_key_values (`Cache`, *optional*, returned when `use_cache=True` is passed or when `config.use_cache=True`): Pre-computed hidden-states that can be used to speed up auto-regressive (sequential) decoding. There are two sets of pre-computed hidden-states: key and values states in the self-attention blocks. The `past_key_values` are returned when `use_cache=True` is passed or when `config.use_cache=True`. It is a [`~cache_utils.Cache`] instance. If `past_key_values` are used, the user can optionally input only the last `input_ids` (those that don't have their past key value states given to this model) of shape `(batch_size, 1)` instead of all `input_ids` of shape `(batch_size, sequence_length)`. attention_mask (`torch.FloatTensor`, *optional*): Attentions mask, used to update attention mask and position_ids. """ loss: Optional[torch.FloatTensor] = None logits: Optional[torch.FloatTensor] = None past_key_values: Optional[Cache] = None hidden_states: Optional[tuple[torch.FloatTensor]] = None attentions: Optional[tuple[torch.FloatTensor]] = None attention_mask: Optional[torch.FloatTensor] = None # Copied from transformers.models.whisper.modeling_whisper.eager_attention_forward def eager_attention_forward( module: nn.Module, query: torch.Tensor, key: torch.Tensor, value: torch.Tensor, attention_mask: Optional[torch.Tensor], scaling: Optional[float] = None, dropout: float = 0.0, head_mask: Optional[torch.Tensor] = None, **kwargs, ): if scaling is None: scaling = query.size(-1) ** -0.5 attn_weights = torch.matmul(query, key.transpose(2, 3)) * scaling if attention_mask is not None and attention_mask.ndim == 4: attn_weights = attn_weights + attention_mask[:, :, :, : key.shape[-2]] attn_weights = nn.functional.softmax(attn_weights, dim=-1) if head_mask is not None: attn_weights = attn_weights * head_mask.view(1, -1, 1, 1) attn_weights = nn.functional.dropout(attn_weights, p=dropout, training=module.training) attn_output = torch.matmul(attn_weights, value) attn_output = attn_output.transpose(1, 2).contiguous() return attn_output, attn_weights class Qwen2AudioAttention(nn.Module): """Multi-headed attention from 'Attention Is All You Need' paper""" # Copied from transformers.models.whisper.modeling_whisper.WhisperAttention.__init__ with Whisper->Qwen2Audio def __init__( self, embed_dim: int, num_heads: int, dropout: float = 0.0, is_decoder: bool = False, bias: bool = True, is_causal: bool = False, layer_idx: Optional[int] = None, config: Optional[Qwen2AudioConfig] = None, ): super().__init__() self.embed_dim = embed_dim self.num_heads = num_heads self.dropout = dropout self.head_dim = embed_dim // num_heads self.config = config if (self.head_dim * num_heads) != self.embed_dim: raise ValueError( f"embed_dim must be divisible by num_heads (got `embed_dim`: {self.embed_dim}" f" and `num_heads`: {num_heads})." ) self.scaling = self.head_dim**-0.5 self.is_decoder = is_decoder self.is_causal = is_causal if layer_idx is None and is_decoder: logger.warning_once( f"Instantiating a decoder {self.__class__.__name__} without passing `layer_idx` is not recommended and " "will to errors during the forward call, if caching is used. Please make sure to provide a `layer_idx` " "when creating this class." ) self.layer_idx = layer_idx self.k_proj = nn.Linear(embed_dim, embed_dim, bias=False) self.v_proj = nn.Linear(embed_dim, embed_dim, bias=bias) self.q_proj = nn.Linear(embed_dim, embed_dim, bias=bias) self.out_proj = nn.Linear(embed_dim, embed_dim, bias=bias) def _shape(self, tensor: torch.Tensor, seq_len: int, bsz: int): return tensor.view(bsz, seq_len, self.num_heads, self.head_dim).transpose(1, 2).contiguous() def forward( self, hidden_states: torch.Tensor, attention_mask: Optional[torch.Tensor] = None, layer_head_mask: Optional[torch.Tensor] = None, output_attentions: bool = False, **kwargs, ) -> tuple[torch.Tensor, Optional[torch.Tensor], Optional[tuple[torch.Tensor]]]: """Input shape: Batch x Time x Channel""" bsz, tgt_len, _ = hidden_states.size() # Scaling is susceptible to floating point arithmetics' inprecisions # which can lead to different results (this is dependent from model # to model, e.g. whisper is one such case). We therefore keep the # original order of scaling to follow the original implementation # and enforce no scaling (1.0) in the attention call below. query_states = self._shape(self.q_proj(hidden_states) * self.scaling, tgt_len, bsz) key_states = self._shape(self.k_proj(hidden_states), -1, bsz) value_states = self._shape(self.v_proj(hidden_states), -1, bsz) attention_interface: Callable = eager_attention_forward if self.config._attn_implementation != "eager": attention_interface = ALL_ATTENTION_FUNCTIONS[self.config._attn_implementation] attn_output, attn_weights = attention_interface( self, query_states, key_states, value_states, attention_mask, dropout=0.0 if not self.training else self.dropout, scaling=1.0, output_attentions=output_attentions, head_mask=layer_head_mask, **kwargs, ) attn_output = attn_output.reshape(bsz, tgt_len, -1).contiguous() attn_output = self.out_proj(attn_output) return attn_output, attn_weights # Copied from transformers.models.whisper.modeling_whisper.WhisperEncoderLayer with Whisper->Qwen2Audio, WHISPER->QWEN2AUDIO class Qwen2AudioEncoderLayer(GradientCheckpointingLayer): def __init__(self, config: Qwen2AudioConfig): super().__init__() self.embed_dim = config.d_model self.self_attn = Qwen2AudioAttention( embed_dim=self.embed_dim, num_heads=config.encoder_attention_heads, dropout=config.attention_dropout, config=config, ) self.self_attn_layer_norm = nn.LayerNorm(self.embed_dim) self.dropout = config.dropout self.activation_fn = ACT2FN[config.activation_function] self.activation_dropout = config.activation_dropout self.fc1 = nn.Linear(self.embed_dim, config.encoder_ffn_dim) self.fc2 = nn.Linear(config.encoder_ffn_dim, self.embed_dim) self.final_layer_norm = nn.LayerNorm(self.embed_dim) def forward( self, hidden_states: torch.Tensor, attention_mask: torch.Tensor, layer_head_mask: torch.Tensor, output_attentions: bool = False, ) -> torch.Tensor: """ Args: hidden_states (`torch.FloatTensor`): input to the layer of shape `(batch, seq_len, embed_dim)` attention_mask (`torch.FloatTensor`): attention mask of size `(batch, 1, tgt_len, src_len)` where padding elements are indicated by very large negative values. layer_head_mask (`torch.FloatTensor`): mask for attention heads in a given layer of size `(encoder_attention_heads,)`. output_attentions (`bool`, *optional*): Whether or not to return the attentions tensors of all attention layers. See `attentions` under returned tensors for more detail. """ residual = hidden_states hidden_states = self.self_attn_layer_norm(hidden_states) hidden_states, attn_weights = self.self_attn( hidden_states=hidden_states, attention_mask=attention_mask, layer_head_mask=layer_head_mask, output_attentions=output_attentions, ) hidden_states = nn.functional.dropout(hidden_states, p=self.dropout, training=self.training) hidden_states = residual + hidden_states residual = hidden_states hidden_states = self.final_layer_norm(hidden_states) hidden_states = self.activation_fn(self.fc1(hidden_states)) hidden_states = nn.functional.dropout(hidden_states, p=self.activation_dropout, training=self.training) hidden_states = self.fc2(hidden_states) hidden_states = nn.functional.dropout(hidden_states, p=self.dropout, training=self.training) hidden_states = residual + hidden_states if hidden_states.dtype == torch.float16: clamp_value = torch.finfo(hidden_states.dtype).max - 1000 hidden_states = torch.clamp(hidden_states, min=-clamp_value, max=clamp_value) return hidden_states, attn_weights @auto_docstring class Qwen2AudioPreTrainedModel(PreTrainedModel): config: Qwen2AudioConfig base_model_prefix = "model" supports_gradient_checkpointing = True _no_split_modules = ["Qwen2AudioAttention"] _skip_keys_device_placement = "past_key_values" _supports_flash_attn = True _supports_sdpa = True def _init_weights(self, module): # important: this ported version of Qwen2Audio isn't meant for training from scratch - only # inference and fine-tuning - so the proper init weights code has been removed std = ( self.config.initializer_range if hasattr(self.config, "initializer_range") else self.config.audio_config.initializer_range ) if isinstance(module, (nn.Linear, nn.Conv1d)): module.weight.data.normal_(mean=0.0, std=std) if module.bias is not None: module.bias.data.zero_() elif isinstance(module, nn.LayerNorm): module.weight.data.fill_(1.0) module.bias.data.zero_() elif isinstance(module, nn.Embedding): module.weight.data.normal_(mean=0.0, std=std) if module.padding_idx is not None: module.weight.data[module.padding_idx].zero_() @auto_docstring( custom_intro=""" The audio model from Qwen2Audio without any head or projection on top. """ ) # Copied from transformers.models.whisper.modeling_whisper.WhisperEncoder with Whisper->Qwen2Audio class Qwen2AudioEncoder(Qwen2AudioPreTrainedModel): """ Transformer encoder consisting of *config.encoder_layers* self attention layers. Each layer is a [`Qwen2AudioEncoderLayer`]. Args: config: Qwen2AudioEncoderConfig """ # Ignore copy config: Qwen2AudioEncoderConfig main_input_name = "input_features" _no_split_modules = ["Qwen2AudioEncoderLayer"] def __init__(self, config: Qwen2AudioEncoderConfig): super().__init__(config) self.dropout = config.dropout self.layerdrop = config.encoder_layerdrop embed_dim = config.d_model self.num_mel_bins = config.num_mel_bins self.padding_idx = config.pad_token_id self.max_source_positions = config.max_source_positions self.embed_scale = math.sqrt(embed_dim) if config.scale_embedding else 1.0 self.conv1 = nn.Conv1d(self.num_mel_bins, embed_dim, kernel_size=3, padding=1) self.conv2 = nn.Conv1d(embed_dim, embed_dim, kernel_size=3, stride=2, padding=1) self.embed_positions = nn.Embedding(self.max_source_positions, embed_dim) self.embed_positions.requires_grad_(False) self.layers = nn.ModuleList([Qwen2AudioEncoderLayer(config) for _ in range(config.encoder_layers)]) self.layer_norm = nn.LayerNorm(config.d_model) # Ignore copy self.avg_pooler = nn.AvgPool1d(2, stride=2) self.gradient_checkpointing = False # Initialize weights and apply final processing self.post_init() def _freeze_parameters(self): for param in self.parameters(): param.requires_grad = False self._requires_grad = False def get_input_embeddings(self) -> nn.Module: return self.conv1 def set_input_embeddings(self, value: nn.Module): self.conv1 = value def forward( self, input_features, attention_mask=None, head_mask=None, output_attentions=None, output_hidden_states=None, return_dict=None, ): r""" Args: attention_mask (`torch.Tensor`)`, *optional*): Qwen2Audio does not support masking of the `input_features`, this argument is preserved for compatibility, but it is not used. By default the silence in the input log mel spectrogram are ignored. head_mask (`torch.Tensor` of shape `(encoder_layers, encoder_attention_heads)`, *optional*): Mask to nullify selected heads of the attention modules. Mask values selected in `[0, 1]`: - 1 indicates the head is **not masked**, - 0 indicates the head is **masked**. output_attentions (`bool`, *optional*): Whether or not to return the attentions tensors of all attention layers. See `attentions` under returned tensors for more detail. output_hidden_states (`bool`, *optional*): Whether or not to return the hidden states of all layers. See `hidden_states` under returned tensors for more detail. return_dict (`bool`, *optional*): Whether or not to return a [`~utils.ModelOutput`] instead of a plain tuple. """ expected_seq_length = self.config.max_source_positions * self.conv1.stride[0] * self.conv2.stride[0] if input_features.shape[-1] != expected_seq_length: raise ValueError( f"Qwen2Audio expects the mel input features to be of length {expected_seq_length}, but found {input_features.shape[-1]}. Make sure to pad the input mel features to {expected_seq_length}." ) output_attentions = output_attentions if output_attentions is not None else self.config.output_attentions output_hidden_states = ( output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states ) return_dict = return_dict if return_dict is not None else self.config.use_return_dict # Ignore copy input_features = input_features.to(dtype=self.conv1.weight.dtype, device=self.conv1.weight.device) inputs_embeds = nn.functional.gelu(self.conv1(input_features)) inputs_embeds = nn.functional.gelu(self.conv2(inputs_embeds)) inputs_embeds = inputs_embeds.permute(0, 2, 1) embed_pos = self.embed_positions.weight hidden_states = inputs_embeds + embed_pos hidden_states = nn.functional.dropout(hidden_states, p=self.dropout, training=self.training) encoder_states = () if output_hidden_states else None all_attentions = () if output_attentions else None # check if head_mask has a correct number of layers specified if desired if head_mask is not None: assert head_mask.size()[0] == (len(self.layers)), ( f"The head_mask should be specified for {len(self.layers)} layers, but it is for {head_mask.size()[0]}." ) for idx, encoder_layer in enumerate(self.layers): if output_hidden_states: encoder_states = encoder_states + (hidden_states,) # add LayerDrop (see https://huggingface.co/papers/1909.11556 for description) to_drop = False if self.training: dropout_probability = torch.rand([]) if dropout_probability < self.layerdrop: # skip the layer to_drop = True # Ignore copy if to_drop: layer_outputs = (None, None) else: layer_outputs = encoder_layer( hidden_states, attention_mask, layer_head_mask=(head_mask[idx] if head_mask is not None else None), output_attentions=output_attentions, ) hidden_states = layer_outputs[0] if output_attentions: all_attentions = all_attentions + (layer_outputs[1],) # Ignore copy hidden_states = hidden_states.permute(0, 2, 1) hidden_states = self.avg_pooler(hidden_states) hidden_states = hidden_states.permute(0, 2, 1) hidden_states = self.layer_norm(hidden_states) if output_hidden_states: encoder_states = encoder_states + (hidden_states,) if not return_dict: return tuple(v for v in [hidden_states, encoder_states, all_attentions] if v is not None) return BaseModelOutput( last_hidden_state=hidden_states, hidden_states=encoder_states, attentions=all_attentions ) # Ignore copy def _get_feat_extract_output_lengths(self, input_lengths: torch.LongTensor): """ Computes the output length of the convolutional layers and the output length of the audio encoder """ input_lengths = (input_lengths - 1) // 2 + 1 output_lengths = (input_lengths - 2) // 2 + 1 return input_lengths, output_lengths class Qwen2AudioMultiModalProjector(nn.Module): def __init__(self, config: Qwen2AudioConfig): super().__init__() self.linear = nn.Linear(config.audio_config.d_model, config.text_config.hidden_size, bias=True) def forward(self, audio_features): hidden_states = self.linear(audio_features) return hidden_states @auto_docstring( custom_intro=""" The QWEN2AUDIO model which consists of a audio backbone and a language model. """ ) class Qwen2AudioForConditionalGeneration(Qwen2AudioPreTrainedModel, GenerationMixin): def __init__(self, config: Qwen2AudioConfig): super().__init__(config) self.audio_tower = AutoModel.from_config(config.audio_config) # Usually a `Qwen2AudioEncoder` instance self.multi_modal_projector = Qwen2AudioMultiModalProjector(config) self.vocab_size = config.text_config.vocab_size self.language_model = AutoModelForCausalLM.from_config(config.text_config) if self.language_model._tied_weights_keys is not None: self._tied_weights_keys = [f"language_model.{k}" for k in self.language_model._tied_weights_keys] self.pad_token_id = self.config.pad_token_id if self.config.pad_token_id is not None else -1 self._padding_side = "left" # set it to left by default, user can use setter to change padding_sides self.post_init() @property def padding_side(self): return self._padding_side @padding_side.setter def padding_side(self, padding_side: str): if padding_side not in ["left", "right"]: raise ValueError(f"{padding_side} is not `left` or `right`.") self._padding_side = padding_side def get_input_embeddings(self): return self.language_model.get_input_embeddings() def set_input_embeddings(self, value): self.language_model.set_input_embeddings(value) def get_output_embeddings(self): return self.language_model.get_output_embeddings() def set_output_embeddings(self, new_embeddings): self.language_model.set_output_embeddings(new_embeddings) def set_decoder(self, decoder): self.language_model.set_decoder(decoder) def get_decoder(self): return self.language_model.get_decoder() def _merge_input_ids_with_audio_features( self, audio_features, num_audio_tokens, inputs_embeds, input_ids, attention_mask, labels ): """ Merge input_ids with with audio features into final embeddings Args: audio_features (`torch.Tensor` of shape `(num_audios, max_audio_tokens, embed_dim)`): All audio vectors of all audios in the batch num_audio_tokens (`torch.LongTensor` of shape `(num_audios)`): The length of audio embeddings of each audio as stacked in `audio_features` inputs_embeds (`torch.Tensor` of shape `(batch_size, sequence_length, embed_dim)`): Token embeddings before merging with audio embeddings input_ids (`torch.LongTensor` of shape `(batch_size, sequence_length)`): Input_ids of tokens, possibly filled with audio token attention_mask (`torch.LongTensor` of shape `(batch_size, sequence_length)`): Mask to avoid performing attention on padding token indices. labels (`torch.Tensor` of shape `(batch_size, sequence_length)`, *optional*) labels need to be recalculated to support training (if provided) Returns: final_embedding, final_attention_mask, final_labels, position_ids, final_input_ids Explanation: each audio has variable length embeddings, with length specified by num_audio_tokens audio_features is concatenation of all audio embed vectors task: fill each <|AUDIO|> with the correct number of audio embeddings Example: X (5 tokens), Y (3 tokens), Z (8 tokens) X, Y are in the same sequence (in-context learning) if right padding input_ids: [ a b c d e f X g h i j k Y l m o p q r Z s t u v _ _ _ _ _ _ ] input_ids should be: [ a b c d e f X X X X X g h i j k Y Y Y l m o p q r Z Z Z Z Z Z Z Z s t u v _ _ _ _ _ ] labels should be: [ a b c d e f _ _ _ _ _ g h i j k _ _ _ l m o p q r _ _ _ _ _ _ _ _ s t u v _ _ _ _ _ ] elif left padding input_ids: [ a b c d e f X g h i j k Y l m _ _ _ _ _ _ o p q r Z s t u v ] input_ids should be: [ a b c d e f X X X X X g h i j k Y Y Y l m _ _ _ _ _ o p q r Z Z Z Z Z Z Z Z s t u v ] labels should be: [ a b c d e f _ _ _ _ _ g h i j k _ _ _ l m _ _ _ _ _ o p q r _ _ _ _ _ _ _ _ s t u v ] Edge cases: * If tokens are same but audio token sizes are different, then cannot infer left or right padding ```python url1 = "https://qianwen-res.oss-cn-beijing.aliyuncs.com/Qwen2-Audio/audio/glass-breaking-151256.mp3" audio1, _ = librosa.load(BytesIO(urlopen(url1).read()), sr=processor.feature_extractor.sampling_rate) url2 = "https://qianwen-res.oss-cn-beijing.aliyuncs.com/Qwen2-Audio/audio/f2641_0_throatclearing.wav" audio2, _ = librosa.load(BytesIO(urlopen(url2).read()), sr=processor.feature_extractor.sampling_rate) prompts = [ "[INST] <|AUDIO|>\nWhat is that in this audio? [/INST]", "[INST] <|AUDIO|>\nWhat is that in this audio? [/INST]", ] inputs = processor(text=prompts, audios=[audio1, audio2], return_tensors='pt', padding=True).to("cuda") audio1 has 101 tokens, while audio2 has 72 tokens ``` input_ids: [ a b c d X g h i j Y k l m n ] where X is 3 tokens while Y is 5, this mean after merge if left-padding (batched generation) input_ids should be: [ _ _ a b c d X X X g h i j Y Y Y Y Y k l m n ] elif (right padding) (training) input_ids should be: [ a b c d X X X g h _ _ i j Y Y Y Y Y k l m n ] """ num_audios, max_audio_tokens, embed_dim = audio_features.shape audio_features_mask = torch.arange(max_audio_tokens).expand(num_audios, max_audio_tokens).to( num_audio_tokens.device ) < num_audio_tokens.unsqueeze(1) masked_audio_features = audio_features[audio_features_mask].view(-1, embed_dim) batch_size, sequence_length = input_ids.shape _left_padding = torch.any(attention_mask[:, 0] == 0) _right_padding = torch.any(attention_mask[:, -1] == 0) left_padding = True if batch_size > 1: if _left_padding and not _right_padding: left_padding = True elif not _left_padding and _right_padding: left_padding = False elif not _left_padding and not _right_padding: # both side is 1, so cannot tell left_padding = self.padding_side == "left" else: # invalid attention_mask raise ValueError(f"both side of attention_mask has zero, invalid. {attention_mask}") # 1. Create a mask to know where special audio tokens are special_audio_token_mask = input_ids == self.config.audio_token_id num_special_audio_tokens = torch.sum(special_audio_token_mask, dim=-1) # In case the Audio model or the Language model has been offloaded to CPU, we need to manually # set the corresponding tensors into their correct target device. target_device = inputs_embeds.device attention_mask = attention_mask.to(target_device) input_ids = input_ids.to(target_device) num_audio_tokens = num_audio_tokens.to(target_device) batch_indices, non_audio_indices = torch.where( (input_ids != self.config.audio_token_id) & (attention_mask == 1) ) # 2. Compute the positions where text should be written # Calculate new positions for text tokens in merged audio-text sequence. # `special_audio_token_mask` identifies audio tokens. Each audio token will be replaced by `audio_feat_lengths - 1` text tokens. # `torch.cumsum` computes how each audio token shifts subsequent text token positions. token_placeholder_num = torch.zeros_like(input_ids) token_placeholder_num[special_audio_token_mask] = num_audio_tokens.long() - 1 token_placeholder_num = token_placeholder_num + 1 new_token_positions = torch.cumsum(token_placeholder_num, -1) - 1 max_token_num = token_placeholder_num.sum(-1).max() nb_audio_pad = max_token_num - 1 - new_token_positions[:, -1] if left_padding: new_token_positions += nb_audio_pad[:, None] # offset for left padding text_to_overwrite = new_token_positions[batch_indices, non_audio_indices] batch_indices, non_audio_indices, text_to_overwrite = ( batch_indices.to(target_device), non_audio_indices.to(target_device), text_to_overwrite.to(target_device), ) # 3. Create the full embedding, already padded to the maximum position final_embedding = torch.zeros( batch_size, max_token_num, embed_dim, dtype=inputs_embeds.dtype, device=inputs_embeds.device ) final_attention_mask = torch.zeros( batch_size, max_token_num, dtype=attention_mask.dtype, device=inputs_embeds.device ) final_input_ids = torch.full( (batch_size, max_token_num), self.pad_token_id, dtype=input_ids.dtype, device=inputs_embeds.device ) # 4. Fill the embeddings based on the mask. If we have ["hey" "