# coding=utf-8 # Copyright 2025 The Qwen team, Alibaba Group and the HuggingFace Inc. team. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """PyTorch Qwen3-Next model.""" from typing import Any, Callable, Optional, Union import torch import torch.nn.functional as F from torch import nn from ...activations import ACT2FN from ...cache_utils import Cache from ...masking_utils import create_causal_mask from ...modeling_flash_attention_utils import FlashAttentionKwargs from ...modeling_outputs import MoeCausalLMOutputWithPast, MoeModelOutputWithPast from ...modeling_utils import ALL_ATTENTION_FUNCTIONS, PreTrainedModel from ...processing_utils import Unpack from ...utils import TransformersKwargs, auto_docstring, logging from ...utils.generic import OutputRecorder, check_model_inputs from ...utils.import_utils import ( is_causal_conv1d_available, is_flash_linear_attention_available, ) from ..bamba.modeling_bamba import apply_mask_to_padding_states, apply_rotary_pos_emb from ..gemma3.modeling_gemma3 import Gemma3RMSNorm from ..llama.modeling_llama import ( LlamaForQuestionAnswering, LlamaForSequenceClassification, LlamaForTokenClassification, ) from ..mixtral.modeling_mixtral import MixtralForCausalLM from ..qwen2_moe.modeling_qwen2_moe import Qwen2MoeSparseMoeBlock from ..qwen3_moe.modeling_qwen3_moe import ( Qwen3MoeAttention, Qwen3MoeDecoderLayer, Qwen3MoeMLP, Qwen3MoeRotaryEmbedding, eager_attention_forward, ) from .configuration_qwen3_next import Qwen3NextConfig if is_causal_conv1d_available(): from causal_conv1d import causal_conv1d_fn, causal_conv1d_update else: causal_conv1d_update, causal_conv1d_fn = None, None if is_flash_linear_attention_available(): from fla.modules import FusedRMSNormGated from fla.ops.gated_delta_rule import chunk_gated_delta_rule, fused_recurrent_gated_delta_rule else: chunk_gated_delta_rule, fused_recurrent_gated_delta_rule = None, None FusedRMSNormGated = None is_fast_path_available = all( (causal_conv1d_fn, causal_conv1d_update, chunk_gated_delta_rule, fused_recurrent_gated_delta_rule) ) logger = logging.get_logger(__name__) class Qwen3NextRMSNormGated(nn.Module): def __init__(self, hidden_size, eps=1e-6, **kwargs): super().__init__() self.weight = nn.Parameter(torch.ones(hidden_size)) self.variance_epsilon = eps def forward(self, hidden_states, gate=None): input_dtype = hidden_states.dtype hidden_states = hidden_states.to(torch.float32) variance = hidden_states.pow(2).mean(-1, keepdim=True) # Norm before gate hidden_states = hidden_states * torch.rsqrt(variance + self.variance_epsilon) hidden_states = self.weight * hidden_states.to(input_dtype) hidden_states = hidden_states * F.silu(gate.to(torch.float32)) return hidden_states.to(input_dtype) class Qwen3NextDynamicCache: """ A dynamic cache that can handle both the attention cache (which has a seq_len dimension) and the linear attention cache (which has a constant shape regardless of seq_len). This cache has two sets of lists of tensors: `key_cache` and `value_cache` for attention cache and `conv_states` and `ssm_states` for gated deltanet cache. Each of these lists has `num_layers` tensors. The expected shape for each tensor For attention layers, `key_cache` and `value_cache` have a shape of `(batch_size, num_heads, seq_len, head_dim)`, while `conv_states` and `ssm_states` have a shape of `(batch_size, 0)` (empty tensors). For linear attention layers, `key_cache` and `value_cache` have a shape of `(batch_size, 0)` (empty tensors), while `conv_states` represents the convolution state and has a shape of `(batch_size, d_inner, d_conv)`, and `recurrent_states` represents the recurrent state and has a shape of `(batch_size, d_inner, d_state)`. """ is_compileable = False def __init__(self, config: Qwen3NextConfig): super().__init__() self.layer_types = config.layer_types self.transformer_layers = [ i for i in range(config.num_hidden_layers) if self.layer_types[i] == "full_attention" ] self.last_linear_layer = len(self.layer_types) - 1 - self.layer_types[::-1].index("linear_attention") # Initialize everything to None -> will be lazy initialized to allow multi-gpu (device_map) inference self.conv_states = [None for _ in range(config.num_hidden_layers)] self.recurrent_states = [None for _ in range(config.num_hidden_layers)] self.key_cache = [None for _ in range(config.num_hidden_layers)] self.value_cache = [None for _ in range(config.num_hidden_layers)] def __len__(self): return len(self.layer_types) def __getitem__(self, layer_idx: int) -> tuple[torch.Tensor, torch.Tensor]: return self.key_cache[layer_idx], self.value_cache[layer_idx] def update( self, key_states: torch.Tensor, value_states: torch.Tensor, layer_idx: int, cache_kwargs: Optional[dict[str, Any]] = None, ) -> tuple[torch.Tensor, torch.Tensor]: if self.key_cache[layer_idx] is None: self.key_cache[layer_idx] = key_states self.value_cache[layer_idx] = value_states else: self.key_cache[layer_idx] = torch.cat([self.key_cache[layer_idx], key_states], dim=2) self.value_cache[layer_idx] = torch.cat([self.value_cache[layer_idx], value_states], dim=2) return self.key_cache[layer_idx], self.value_cache[layer_idx] def reorder_cache(self, beam_idx: torch.LongTensor): """Reorders the cache for beam search, given the selected beam indices.""" for layer_idx in range(len(self.key_cache)): if self.key_cache[layer_idx] is not None: device = self.key_cache[layer_idx].device beam_idx = beam_idx.to(device) self.key_cache[layer_idx] = self.key_cache[layer_idx].index_select(0, beam_idx) self.value_cache[layer_idx] = self.value_cache[layer_idx].index_select(0, beam_idx) if self.conv_states[layer_idx] is not None: device = self.conv_states[layer_idx].device beam_idx = beam_idx.to(device) self.conv_states[layer_idx] = self.conv_states[layer_idx].index_select(0, beam_idx) self.recurrent_states[layer_idx] = self.recurrent_states[layer_idx].index_select(0, beam_idx) def get_seq_length(self, layer_idx: Optional[int] = 0) -> int: """Returns the sequence length of the cached states. A layer index can be optionally passed.""" # take any layer that contains cache and not empty tensor layer_idx = self.transformer_layers[0] if layer_idx not in self.transformer_layers else layer_idx if len(self.key_cache) <= layer_idx or self.key_cache[layer_idx] is None: return 0 return self.key_cache[layer_idx].shape[-2] def get_mask_sizes(self, cache_position: torch.Tensor, layer_idx: int) -> tuple[int, int]: """ Return a tuple (kv_length, kv_offset) corresponding to the length and offset that will be returned for the given layer at `layer_idx`. The masks are then prepared according to the given lengths (kv_length, kv_offset) and patterns for each layer. """ kv_offset = 0 query_length = cache_position.shape[0] past_seen_tokens = self.get_seq_length(layer_idx) kv_length = query_length + past_seen_tokens return kv_length, kv_offset @property def has_previous_state(self): """We have a previous state if the last linear (conv) layer was already updated.""" return self.conv_states[self.last_linear_layer] is not None class Qwen3NextRotaryEmbedding(Qwen3MoeRotaryEmbedding): pass class Qwen3NextRMSNorm(Gemma3RMSNorm): pass class Qwen3NextAttention(Qwen3MoeAttention): def __init__(self, config: Qwen3NextConfig, layer_idx: int): super().__init__(config, layer_idx) self.q_proj = nn.Linear( config.hidden_size, config.num_attention_heads * self.head_dim * 2, bias=config.attention_bias ) del self.sliding_window def forward( self, hidden_states: torch.Tensor, position_embeddings: tuple[torch.Tensor, torch.Tensor], attention_mask: Optional[torch.Tensor], past_key_values: Optional[Cache] = None, cache_position: Optional[torch.LongTensor] = None, **kwargs: Unpack[FlashAttentionKwargs], ) -> tuple[torch.Tensor, Optional[torch.Tensor]]: input_shape = hidden_states.shape[:-1] hidden_shape = (*input_shape, -1, self.head_dim) query_states, gate = torch.chunk( self.q_proj(hidden_states).view(*input_shape, -1, self.head_dim * 2), 2, dim=-1 ) gate = gate.reshape(*input_shape, -1) query_states = self.q_norm(query_states.view(hidden_shape)).transpose(1, 2) key_states = self.k_norm(self.k_proj(hidden_states).view(hidden_shape)).transpose(1, 2) value_states = self.v_proj(hidden_states).view(hidden_shape).transpose(1, 2) cos, sin = position_embeddings query_states, key_states = apply_rotary_pos_emb(query_states, key_states, cos, sin) if past_key_values is not None: # sin and cos are specific to RoPE models; cache_position needed for the static cache cache_kwargs = {"sin": sin, "cos": cos, "cache_position": cache_position} key_states, value_states = past_key_values.update(key_states, value_states, self.layer_idx, cache_kwargs) attention_interface: Callable = eager_attention_forward if self.config._attn_implementation != "eager": attention_interface = ALL_ATTENTION_FUNCTIONS[self.config._attn_implementation] attn_output, attn_weights = attention_interface( self, query_states, key_states, value_states, attention_mask, dropout=0.0 if not self.training else self.attention_dropout, scaling=self.scaling, **kwargs, ) attn_output = attn_output.reshape(*input_shape, -1).contiguous() attn_output = attn_output * torch.sigmoid(gate) attn_output = self.o_proj(attn_output) return attn_output, attn_weights def torch_causal_conv1d_update( hidden_states, conv_state, weight, bias=None, activation=None, ): _, hidden_size, seq_len = hidden_states.shape state_len = conv_state.shape[-1] hidden_states_new = torch.cat([conv_state, hidden_states], dim=-1).to(weight.dtype) conv_state.copy_(hidden_states_new[:, :, -state_len:]) out = F.conv1d(hidden_states_new, weight.unsqueeze(1), bias, padding=0, groups=hidden_size) out = F.silu(out[:, :, -seq_len:]) out = out.to(hidden_states.dtype) return out def l2norm(x: torch.FloatTensor, dim: int = -1, eps: float = 1e-6): """This function is intended to align with the l2norm implementation in the FLA library.""" inv_norm = torch.rsqrt((x * x).sum(dim=dim, keepdim=True) + eps) return x * inv_norm def torch_chunk_gated_delta_rule( query, key, value, g, beta, chunk_size=64, initial_state=None, output_final_state=False, use_qk_l2norm_in_kernel=False, ): initial_dtype = query.dtype if use_qk_l2norm_in_kernel: query = l2norm(query, dim=-1, eps=1e-6) key = l2norm(key, dim=-1, eps=1e-6) query, key, value, beta, g = [ x.transpose(1, 2).contiguous().to(torch.float32) for x in (query, key, value, beta, g) ] batch_size, num_heads, sequence_length, k_head_dim = key.shape v_head_dim = value.shape[-1] pad_size = (chunk_size - sequence_length % chunk_size) % chunk_size query = F.pad(query, (0, 0, 0, pad_size)) key = F.pad(key, (0, 0, 0, pad_size)) value = F.pad(value, (0, 0, 0, pad_size)) beta = F.pad(beta, (0, pad_size)) g = F.pad(g, (0, pad_size)) total_sequence_length = sequence_length + pad_size scale = 1 / (query.shape[-1] ** 0.5) query = query * scale v_beta = value * beta.unsqueeze(-1) k_beta = key * beta.unsqueeze(-1) # reshape to chunks query, key, value, k_beta, v_beta = [ x.reshape(x.shape[0], x.shape[1], -1, chunk_size, x.shape[-1]) for x in (query, key, value, k_beta, v_beta) ] g = g.reshape(g.shape[0], g.shape[1], -1, chunk_size) mask = torch.triu(torch.ones(chunk_size, chunk_size, dtype=torch.bool, device=query.device), diagonal=0) # chunk decay g = g.cumsum(dim=-1) decay_mask = ((g.unsqueeze(-1) - g.unsqueeze(-2)).tril().exp().float()).tril() attn = -((k_beta @ key.transpose(-1, -2)) * decay_mask).masked_fill(mask, 0) for i in range(1, chunk_size): row = attn[..., i, :i].clone() sub = attn[..., :i, :i].clone() attn[..., i, :i] = row + (row.unsqueeze(-1) * sub).sum(-2) attn = attn + torch.eye(chunk_size, dtype=attn.dtype, device=attn.device) value = attn @ v_beta k_cumdecay = attn @ (k_beta * g.exp().unsqueeze(-1)) last_recurrent_state = ( torch.zeros(batch_size, num_heads, k_head_dim, v_head_dim).to(value) if initial_state is None else initial_state.to(value) ) core_attn_out = torch.zeros_like(value) mask = torch.triu(torch.ones(chunk_size, chunk_size, dtype=torch.bool, device=query.device), diagonal=1) # for each chunk for i in range(0, total_sequence_length // chunk_size): q_i, k_i, v_i = query[:, :, i], key[:, :, i], value[:, :, i] attn = (q_i @ k_i.transpose(-1, -2) * decay_mask[:, :, i]).masked_fill_(mask, 0) v_prime = (k_cumdecay[:, :, i]) @ last_recurrent_state v_new = v_i - v_prime attn_inter = (q_i * g[:, :, i, :, None].exp()) @ last_recurrent_state core_attn_out[:, :, i] = attn_inter + attn @ v_new last_recurrent_state = ( last_recurrent_state * g[:, :, i, -1, None, None].exp() + (k_i * (g[:, :, i, -1, None] - g[:, :, i]).exp()[..., None]).transpose(-1, -2) @ v_new ) if not output_final_state: last_recurrent_state = None core_attn_out = core_attn_out.reshape(core_attn_out.shape[0], core_attn_out.shape[1], -1, core_attn_out.shape[-1]) core_attn_out = core_attn_out[:, :, :sequence_length] core_attn_out = core_attn_out.transpose(1, 2).contiguous().to(initial_dtype) return core_attn_out, last_recurrent_state def torch_recurrent_gated_delta_rule( query, key, value, g, beta, initial_state, output_final_state, use_qk_l2norm_in_kernel=False ): initial_dtype = query.dtype if use_qk_l2norm_in_kernel: query = l2norm(query, dim=-1, eps=1e-6) key = l2norm(key, dim=-1, eps=1e-6) query, key, value, beta, g = [ x.transpose(1, 2).contiguous().to(torch.float32) for x in (query, key, value, beta, g) ] batch_size, num_heads, sequence_length, k_head_dim = key.shape v_head_dim = value.shape[-1] scale = 1 / (query.shape[-1] ** 0.5) query = query * scale core_attn_out = torch.zeros(batch_size, num_heads, sequence_length, v_head_dim).to(value) last_recurrent_state = ( torch.zeros(batch_size, num_heads, k_head_dim, v_head_dim).to(value) if initial_state is None else initial_state.to(value) ) for i in range(sequence_length): q_t = query[:, :, i] k_t = key[:, :, i] v_t = value[:, :, i] g_t = g[:, :, i].exp().unsqueeze(-1).unsqueeze(-1) beta_t = beta[:, :, i].unsqueeze(-1) last_recurrent_state = last_recurrent_state * g_t kv_mem = (last_recurrent_state * k_t.unsqueeze(-1)).sum(dim=-2) delta = (v_t - kv_mem) * beta_t last_recurrent_state = last_recurrent_state + k_t.unsqueeze(-1) * delta.unsqueeze(-2) core_attn_out[:, :, i] = (last_recurrent_state * q_t.unsqueeze(-1)).sum(dim=-2) if not output_final_state: last_recurrent_state = None core_attn_out = core_attn_out.transpose(1, 2).contiguous().to(initial_dtype) return core_attn_out, last_recurrent_state class Qwen3NextGatedDeltaNet(nn.Module): def __init__(self, config: Qwen3NextConfig, layer_idx: int): super().__init__() self.hidden_size = config.hidden_size self.num_v_heads = config.linear_num_value_heads self.num_k_heads = config.linear_num_key_heads self.head_k_dim = config.linear_key_head_dim self.head_v_dim = config.linear_value_head_dim self.key_dim = self.head_k_dim * self.num_k_heads self.value_dim = self.head_v_dim * self.num_v_heads self.conv_kernel_size = config.linear_conv_kernel_dim self.layer_idx = layer_idx self.activation = config.hidden_act self.act = ACT2FN[config.hidden_act] self.layer_norm_epsilon = config.rms_norm_eps # QKV self.conv_dim = self.key_dim * 2 + self.value_dim self.conv1d = nn.Conv1d( in_channels=self.conv_dim, out_channels=self.conv_dim, bias=False, kernel_size=self.conv_kernel_size, groups=self.conv_dim, padding=self.conv_kernel_size - 1, ) # projection of the input hidden states projection_size_qkvz = self.key_dim * 2 + self.value_dim * 2 projection_size_ba = self.num_v_heads * 2 self.in_proj_qkvz = nn.Linear(self.hidden_size, projection_size_qkvz, bias=False) self.in_proj_ba = nn.Linear(self.hidden_size, projection_size_ba, bias=False) # time step projection (discretization) # instantiate once and copy inv_dt in init_weights of PretrainedModel self.dt_bias = nn.Parameter(torch.ones(self.num_v_heads)) A = torch.empty(self.num_v_heads).uniform_(0, 16) self.A_log = nn.Parameter(torch.log(A)) self.norm = ( Qwen3NextRMSNormGated(self.head_v_dim, eps=self.layer_norm_epsilon) if FusedRMSNormGated is None else FusedRMSNormGated( self.head_v_dim, eps=self.layer_norm_epsilon, activation=self.activation, device=torch.cuda.current_device(), dtype=config.dtype if config.dtype is not None else torch.get_current_dtype(), ) ) self.out_proj = nn.Linear(self.value_dim, self.hidden_size, bias=False) self.causal_conv1d_fn = causal_conv1d_fn self.causal_conv1d_update = causal_conv1d_update or torch_causal_conv1d_update self.chunk_gated_delta_rule = chunk_gated_delta_rule or torch_chunk_gated_delta_rule self.recurrent_gated_delta_rule = fused_recurrent_gated_delta_rule or torch_recurrent_gated_delta_rule if not is_fast_path_available: logger.warning_once( "The fast path is not available because one of the required library is not installed. Falling back to " "torch implementation. To install follow https://github.com/fla-org/flash-linear-attention#installation and" " https://github.com/Dao-AILab/causal-conv1d" ) def fix_query_key_value_ordering(self, mixed_qkvz, mixed_ba): """ Derives `query`, `key` and `value` tensors from `mixed_qkvz` and `mixed_ba`. """ new_tensor_shape_qkvz = mixed_qkvz.size()[:-1] + ( self.num_k_heads, 2 * self.head_k_dim + 2 * self.head_v_dim * self.num_v_heads // self.num_k_heads, ) new_tensor_shape_ba = mixed_ba.size()[:-1] + (self.num_k_heads, 2 * self.num_v_heads // self.num_k_heads) mixed_qkvz = mixed_qkvz.view(*new_tensor_shape_qkvz) mixed_ba = mixed_ba.view(*new_tensor_shape_ba) split_arg_list_qkvz = [ self.head_k_dim, self.head_k_dim, (self.num_v_heads // self.num_k_heads * self.head_v_dim), (self.num_v_heads // self.num_k_heads * self.head_v_dim), ] split_arg_list_ba = [self.num_v_heads // self.num_k_heads, self.num_v_heads // self.num_k_heads] query, key, value, z = torch.split(mixed_qkvz, split_arg_list_qkvz, dim=3) b, a = torch.split(mixed_ba, split_arg_list_ba, dim=3) # [b, sq, ng, np/ng * hn] -> [b, sq, np, hn] value = value.reshape(value.size(0), value.size(1), -1, self.head_v_dim) z = z.reshape(z.size(0), z.size(1), -1, self.head_v_dim) b = b.reshape(b.size(0), b.size(1), self.num_v_heads) a = a.reshape(a.size(0), a.size(1), self.num_v_heads) return query, key, value, z, b, a def forward( self, hidden_states: torch.Tensor, cache_params: Optional[Qwen3NextDynamicCache] = None, cache_position: Optional[torch.LongTensor] = None, attention_mask: Optional[torch.Tensor] = None, ): hidden_states = apply_mask_to_padding_states(hidden_states, attention_mask) # Set up dimensions for reshapes later batch_size, seq_len, _ = hidden_states.shape use_precomputed_states = ( cache_params is not None and cache_params.has_previous_state and seq_len == 1 and cache_position is not None ) # getting projected states from cache if it exists if cache_params is not None: conv_state = cache_params.conv_states[self.layer_idx] recurrent_state = cache_params.recurrent_states[self.layer_idx] projected_states_qkvz = self.in_proj_qkvz(hidden_states) projected_states_ba = self.in_proj_ba(hidden_states) query, key, value, z, b, a = self.fix_query_key_value_ordering(projected_states_qkvz, projected_states_ba) query, key, value = (x.reshape(x.shape[0], x.shape[1], -1) for x in (query, key, value)) mixed_qkv = torch.cat((query, key, value), dim=-1) mixed_qkv = mixed_qkv.transpose(1, 2) if use_precomputed_states: # 2. Convolution sequence transformation # NOTE: the conv state is updated in `causal_conv1d_update` mixed_qkv = self.causal_conv1d_update( mixed_qkv, conv_state, self.conv1d.weight.squeeze(1), self.conv1d.bias, self.activation, ) else: if cache_params is not None: conv_state = F.pad(mixed_qkv, (self.conv_kernel_size - mixed_qkv.shape[-1], 0)) cache_params.conv_states[self.layer_idx] = conv_state if self.causal_conv1d_fn is not None: mixed_qkv = self.causal_conv1d_fn( x=mixed_qkv, weight=self.conv1d.weight.squeeze(1), bias=self.conv1d.bias, activation=self.activation, seq_idx=None, ) else: mixed_qkv = F.silu(self.conv1d(mixed_qkv)[:, :, :seq_len]) mixed_qkv = mixed_qkv.transpose(1, 2) query, key, value = torch.split( mixed_qkv, [ self.key_dim, self.key_dim, self.value_dim, ], dim=-1, ) query = query.reshape(query.shape[0], query.shape[1], -1, self.head_k_dim) key = key.reshape(key.shape[0], key.shape[1], -1, self.head_k_dim) value = value.reshape(value.shape[0], value.shape[1], -1, self.head_v_dim) beta = b.sigmoid() # If the model is loaded in fp16, without the .float() here, A might be -inf g = -self.A_log.float().exp() * F.softplus(a.float() + self.dt_bias) if self.num_v_heads // self.num_k_heads > 1: query = query.repeat_interleave(self.num_v_heads // self.num_k_heads, dim=2) key = key.repeat_interleave(self.num_v_heads // self.num_k_heads, dim=2) if not use_precomputed_states: core_attn_out, last_recurrent_state = self.chunk_gated_delta_rule( query, key, value, g=g, beta=beta, initial_state=None, output_final_state=cache_params is not None, use_qk_l2norm_in_kernel=True, ) else: core_attn_out, last_recurrent_state = self.recurrent_gated_delta_rule( query, key, value, g=g, beta=beta, initial_state=recurrent_state, output_final_state=cache_params is not None, use_qk_l2norm_in_kernel=True, ) # Update cache if cache_params is not None: cache_params.recurrent_states[self.layer_idx] = last_recurrent_state z_shape_og = z.shape # reshape input data into 2D tensor core_attn_out = core_attn_out.reshape(-1, core_attn_out.shape[-1]) z = z.reshape(-1, z.shape[-1]) core_attn_out = self.norm(core_attn_out, z) core_attn_out = core_attn_out.reshape(z_shape_og) core_attn_out = core_attn_out.reshape(core_attn_out.shape[0], core_attn_out.shape[1], -1) output = self.out_proj(core_attn_out) return output class Qwen3NextMLP(Qwen3MoeMLP): pass class Qwen3NextSparseMoeBlock(Qwen2MoeSparseMoeBlock): pass class Qwen3NextDecoderLayer(Qwen3MoeDecoderLayer): def __init__(self, config: Qwen3NextConfig, layer_idx: int): nn.Module.__init__(self) self.hidden_size = config.hidden_size # token mixer self.layer_type = config.layer_types[layer_idx] if self.layer_type == "linear_attention": self.linear_attn = Qwen3NextGatedDeltaNet(config, layer_idx) elif self.layer_type == "full_attention": self.self_attn = Qwen3NextAttention(config, layer_idx) if (layer_idx not in config.mlp_only_layers) and ( config.num_experts > 0 and (layer_idx + 1) % config.decoder_sparse_step == 0 ): self.mlp = Qwen3NextSparseMoeBlock(config) else: self.mlp = Qwen3NextMLP(config, intermediate_size=config.intermediate_size) self.input_layernorm = Qwen3NextRMSNorm(config.hidden_size, eps=config.rms_norm_eps) self.post_attention_layernorm = Qwen3NextRMSNorm(config.hidden_size, eps=config.rms_norm_eps) def forward( self, hidden_states: torch.Tensor, position_embeddings: tuple[torch.Tensor, torch.Tensor], attention_mask: Optional[torch.Tensor] = None, position_ids: Optional[torch.LongTensor] = None, past_key_values: Optional[Cache] = None, cache_position: Optional[torch.LongTensor] = None, **kwargs: Unpack[FlashAttentionKwargs], ) -> torch.FloatTensor: residual = hidden_states hidden_states = self.input_layernorm(hidden_states) # Token Mixer if self.layer_type == "linear_attention": hidden_states = self.linear_attn( hidden_states=hidden_states, cache_params=past_key_values, cache_position=cache_position, attention_mask=attention_mask, ) elif self.layer_type == "full_attention": # Self Attention hidden_states, _ = self.self_attn( hidden_states=hidden_states, attention_mask=attention_mask, position_ids=position_ids, past_key_values=past_key_values, cache_position=cache_position, position_embeddings=position_embeddings, **kwargs, ) hidden_states = residual + hidden_states # Fully Connected residual = hidden_states hidden_states = self.post_attention_layernorm(hidden_states) hidden_states = self.mlp(hidden_states) # For the MoE layers, we need to unpack if isinstance(hidden_states, tuple): hidden_states, _ = hidden_states hidden_states = residual + hidden_states return hidden_states class Qwen3NextPreTrainedModel(PreTrainedModel): config: Qwen3NextConfig base_model_prefix = "model" supports_gradient_checkpointing = True _no_split_modules = ["Qwen3NextDecoderLayer"] _skip_keys_device_placement = "past_key_values" _supports_flash_attn_2 = True _supports_sdpa = True _keys_to_ignore_on_load_unexpected = [r"^mtp.*"] _can_record_outputs = { "router_logits": OutputRecorder(Qwen3NextSparseMoeBlock, index=1), "hidden_states": Qwen3NextDecoderLayer, "attentions": Qwen3NextAttention, } _is_stateful = True def _init_weights(self, module): super()._init_weights(module) if isinstance(module, Qwen3NextGatedDeltaNet): module.dt_bias.data.fill_(1.0) module.A_log.data.uniform_(0, 16).log_() # We initialize with 0s to be 1 centered as the RMSNorm here does (1 + weight) elif isinstance(module, Qwen3NextRMSNorm): module.weight.data.zero_() class Qwen3NextModel(Qwen3NextPreTrainedModel): def __init__(self, config: Qwen3NextConfig): super().__init__(config) self.embed_tokens = nn.Embedding(config.vocab_size, config.hidden_size, config.pad_token_id) self.layers = nn.ModuleList( [Qwen3NextDecoderLayer(config, layer_idx) for layer_idx in range(config.num_hidden_layers)] ) self.norm = Qwen3NextRMSNorm(config.hidden_size, eps=config.rms_norm_eps) self.rotary_emb = Qwen3NextRotaryEmbedding(config=config) self.gradient_checkpointing = False # Initialize weights and apply final processing self.post_init() @check_model_inputs @auto_docstring def forward( self, input_ids: Optional[torch.LongTensor] = None, attention_mask: Optional[torch.Tensor] = None, position_ids: Optional[torch.LongTensor] = None, past_key_values: Optional[Cache] = None, inputs_embeds: Optional[torch.FloatTensor] = None, use_cache: Optional[bool] = None, cache_position: Optional[torch.LongTensor] = None, **kwargs: Unpack[TransformersKwargs], ) -> MoeModelOutputWithPast: if (input_ids is None) ^ (inputs_embeds is not None): raise ValueError("You must specify exactly one of input_ids or inputs_embeds") if inputs_embeds is None: inputs_embeds = self.embed_tokens(input_ids) if use_cache and past_key_values is None: past_key_values = Qwen3NextDynamicCache(config=self.config) if cache_position is None: past_seen_tokens = past_key_values.get_seq_length() if past_key_values is not None else 0 cache_position = torch.arange( past_seen_tokens, past_seen_tokens + inputs_embeds.shape[1], device=inputs_embeds.device ) if position_ids is None: position_ids = cache_position.unsqueeze(0) causal_mask = create_causal_mask( config=self.config, input_embeds=inputs_embeds, attention_mask=attention_mask, cache_position=cache_position, past_key_values=past_key_values, position_ids=position_ids, ) linear_attn_mask = self._update_linear_attn_mask(attention_mask, cache_position) hidden_states = inputs_embeds # create position embeddings to be shared across the decoder layers position_embeddings = self.rotary_emb(hidden_states, position_ids) for decoder_layer in self.layers[: self.config.num_hidden_layers]: layer_mask = linear_attn_mask if decoder_layer.layer_type == "linear_attention" else causal_mask hidden_states = decoder_layer( hidden_states, position_embeddings=position_embeddings, attention_mask=layer_mask, position_ids=position_ids, past_key_values=past_key_values, use_cache=use_cache, cache_position=cache_position, **kwargs, ) hidden_states = self.norm(hidden_states) return MoeModelOutputWithPast( last_hidden_state=hidden_states, past_key_values=past_key_values, ) def _update_linear_attn_mask(self, attention_mask, cache_position): """ NOTE: Left-padding is used for linear attention mask. No need for zeroing states when 1. Cached forward 2. Attending to all inputs """ linear_attn_mask = attention_mask if cache_position[0] > 0 or (attention_mask is not None and torch.all(attention_mask == 1)): linear_attn_mask = None return linear_attn_mask class Qwen3NextForCausalLM(MixtralForCausalLM): def __init__(self, config): super().__init__(config) self.num_experts = config.num_experts def forward( self, input_ids: Optional[torch.LongTensor] = None, attention_mask: Optional[torch.Tensor] = None, position_ids: Optional[torch.LongTensor] = None, past_key_values: Optional[Qwen3NextDynamicCache] = None, inputs_embeds: Optional[torch.FloatTensor] = None, labels: Optional[torch.LongTensor] = None, use_cache: Optional[bool] = None, output_router_logits: Optional[bool] = None, cache_position: Optional[torch.LongTensor] = None, logits_to_keep: Union[int, torch.Tensor] = 0, **kwargs: Unpack[TransformersKwargs], ) -> MoeCausalLMOutputWithPast: r""" labels (`torch.LongTensor` of shape `(batch_size, sequence_length)`, *optional*): Labels for computing the masked language modeling loss. Indices should either be in `[0, ..., config.vocab_size]` or -100 (see `input_ids` docstring). Tokens with indices set to `-100` are ignored (masked), the loss is only computed for the tokens with labels in `[0, ..., config.vocab_size]`. Example: ```python >>> from transformers import AutoTokenizer, Qwen3NextForCausalLM >>> model = Qwen3NextForCausalLM.from_pretrained("Qwen/Qwen3-Next-80B-A3B-Instruct") >>> tokenizer = AutoTokenizer.from_pretrained("Qwen/Qwen3-Next-80B-A3B-Instruct") >>> prompt = "Hey, are you conscious? Can you talk to me?" >>> inputs = tokenizer(prompt, return_tensors="pt") >>> # Generate >>> generate_ids = model.generate(inputs.input_ids, max_length=30) >>> tokenizer.batch_decode(generate_ids, skip_special_tokens=True, clean_up_tokenization_spaces=False)[0] "Hey, are you conscious? Can you talk to me?\nI'm not conscious, but I can talk to you." ```""" return super().forward( input_ids=input_ids, attention_mask=attention_mask, position_ids=position_ids, past_key_values=past_key_values, inputs_embeds=inputs_embeds, labels=labels, use_cache=use_cache, output_router_logits=output_router_logits, cache_position=cache_position, logits_to_keep=logits_to_keep, **kwargs, ) class Qwen3NextForSequenceClassification(LlamaForSequenceClassification): pass class Qwen3NextForTokenClassification(LlamaForTokenClassification): pass class Qwen3NextForQuestionAnswering(LlamaForQuestionAnswering): pass __all__ = [ "Qwen3NextForCausalLM", "Qwen3NextForQuestionAnswering", "Qwen3NextModel", "Qwen3NextPreTrainedModel", "Qwen3NextForSequenceClassification", "Qwen3NextForTokenClassification", ]