# 🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨 # This file was automatically generated from src/transformers/models/d_fine/modular_d_fine.py. # Do NOT edit this file manually as any edits will be overwritten by the generation of # the file from the modular. If any change should be done, please apply the change to the # modular_d_fine.py file directly. One of our CI enforces this. # 🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨 # coding=utf-8 # Copyright 2025 Baidu Inc and The HuggingFace Inc. team. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import math from dataclasses import dataclass from typing import Any, Optional, Union import torch import torch.nn.functional as F import torch.nn.init as init from torch import Tensor, nn from ...activations import ACT2CLS, ACT2FN from ...image_transforms import center_to_corners_format, corners_to_center_format from ...modeling_outputs import BaseModelOutput from ...modeling_utils import PreTrainedModel from ...pytorch_utils import compile_compatible_method_lru_cache from ...utils import ModelOutput, auto_docstring, is_torchdynamo_compiling, torch_int from ...utils.backbone_utils import load_backbone from .configuration_d_fine import DFineConfig def multi_scale_deformable_attention_v2( value: Tensor, value_spatial_shapes: Tensor, sampling_locations: Tensor, attention_weights: Tensor, num_points_list: list[int], method="default", ) -> Tensor: batch_size, _, num_heads, hidden_dim = value.shape _, num_queries, num_heads, num_levels, num_points = sampling_locations.shape value_list = ( value.permute(0, 2, 3, 1) .flatten(0, 1) .split([height * width for height, width in value_spatial_shapes], dim=-1) ) # sampling_offsets [8, 480, 8, 12, 2] if method == "default": sampling_grids = 2 * sampling_locations - 1 elif method == "discrete": sampling_grids = sampling_locations sampling_grids = sampling_grids.permute(0, 2, 1, 3, 4).flatten(0, 1) sampling_grids = sampling_grids.split(num_points_list, dim=-2) sampling_value_list = [] for level_id, (height, width) in enumerate(value_spatial_shapes): # batch_size, height*width, num_heads, hidden_dim # -> batch_size, height*width, num_heads*hidden_dim # -> batch_size, num_heads*hidden_dim, height*width # -> batch_size*num_heads, hidden_dim, height, width value_l_ = value_list[level_id].reshape(batch_size * num_heads, hidden_dim, height, width) # batch_size, num_queries, num_heads, num_points, 2 # -> batch_size, num_heads, num_queries, num_points, 2 # -> batch_size*num_heads, num_queries, num_points, 2 sampling_grid_l_ = sampling_grids[level_id] # batch_size*num_heads, hidden_dim, num_queries, num_points if method == "default": sampling_value_l_ = nn.functional.grid_sample( value_l_, sampling_grid_l_, mode="bilinear", padding_mode="zeros", align_corners=False ) elif method == "discrete": sampling_coord = (sampling_grid_l_ * torch.tensor([[width, height]], device=value.device) + 0.5).to( torch.int64 ) # Separate clamping for x and y coordinates sampling_coord_x = sampling_coord[..., 0].clamp(0, width - 1) sampling_coord_y = sampling_coord[..., 1].clamp(0, height - 1) # Combine the clamped coordinates sampling_coord = torch.stack([sampling_coord_x, sampling_coord_y], dim=-1) sampling_coord = sampling_coord.reshape(batch_size * num_heads, num_queries * num_points_list[level_id], 2) sampling_idx = ( torch.arange(sampling_coord.shape[0], device=value.device) .unsqueeze(-1) .repeat(1, sampling_coord.shape[1]) ) sampling_value_l_ = value_l_[sampling_idx, :, sampling_coord[..., 1], sampling_coord[..., 0]] sampling_value_l_ = sampling_value_l_.permute(0, 2, 1).reshape( batch_size * num_heads, hidden_dim, num_queries, num_points_list[level_id] ) sampling_value_list.append(sampling_value_l_) # (batch_size, num_queries, num_heads, num_levels, num_points) # -> (batch_size, num_heads, num_queries, num_levels, num_points) # -> (batch_size, num_heads, 1, num_queries, num_levels*num_points) attention_weights = attention_weights.permute(0, 2, 1, 3).reshape( batch_size * num_heads, 1, num_queries, sum(num_points_list) ) output = ( (torch.concat(sampling_value_list, dim=-1) * attention_weights) .sum(-1) .view(batch_size, num_heads * hidden_dim, num_queries) ) return output.transpose(1, 2).contiguous() class DFineMultiscaleDeformableAttention(nn.Module): def __init__(self, config: DFineConfig): """ D-Fine version of multiscale deformable attention """ super().__init__() self.d_model = config.d_model self.n_heads = config.decoder_attention_heads self.n_levels = config.num_feature_levels self.offset_scale = config.decoder_offset_scale self.decoder_method = config.decoder_method self.n_points = config.decoder_n_points if isinstance(self.n_points, list): num_points_list = self.n_points else: num_points_list = [self.n_points for _ in range(self.n_levels)] self.num_points_list = num_points_list num_points_scale = [1 / n for n in self.num_points_list for _ in range(n)] self.register_buffer("num_points_scale", torch.tensor(num_points_scale, dtype=torch.float32)) self.total_points = self.n_heads * sum(self.num_points_list) self.sampling_offsets = nn.Linear(self.d_model, self.total_points * 2) self.attention_weights = nn.Linear(self.d_model, self.total_points) self.ms_deformable_attn_core = multi_scale_deformable_attention_v2 def forward( self, hidden_states: torch.Tensor, attention_mask: Optional[torch.Tensor] = None, reference_points=None, encoder_hidden_states=None, spatial_shapes=None, spatial_shapes_list=None, ) -> tuple[torch.Tensor, torch.Tensor]: batch_size, num_queries, _ = hidden_states.shape batch_size, sequence_length, _ = encoder_hidden_states.shape if not is_torchdynamo_compiling() and (spatial_shapes[:, 0] * spatial_shapes[:, 1]).sum() != sequence_length: raise ValueError( "Make sure to align the spatial shapes with the sequence length of the encoder hidden states" ) # Reshape for multi-head attention value = encoder_hidden_states.reshape(batch_size, sequence_length, self.n_heads, self.d_model // self.n_heads) if attention_mask is not None: value = value.masked_fill(~attention_mask[..., None], float(0)) sampling_offsets: torch.Tensor = self.sampling_offsets(hidden_states) sampling_offsets = sampling_offsets.reshape( batch_size, num_queries, self.n_heads, sum(self.num_points_list), 2 ) attention_weights = self.attention_weights(hidden_states).reshape( batch_size, num_queries, self.n_heads, sum(self.num_points_list) ) attention_weights = F.softmax(attention_weights, dim=-1) if reference_points.shape[-1] == 2: offset_normalizer = torch.tensor(spatial_shapes) offset_normalizer = offset_normalizer.flip([1]).reshape(1, 1, 1, self.n_levels, 1, 2) sampling_locations = ( reference_points.reshape(batch_size, sequence_length, 1, self.n_levels, 1, 2) + sampling_offsets / offset_normalizer ) elif reference_points.shape[-1] == 4: # reference_points [8, 480, None, 1, 4] # sampling_offsets [8, 480, 8, 12, 2] num_points_scale = self.num_points_scale.to(dtype=hidden_states.dtype).unsqueeze(-1) offset = sampling_offsets * num_points_scale * reference_points[:, :, None, :, 2:] * self.offset_scale sampling_locations = reference_points[:, :, None, :, :2] + offset else: raise ValueError( f"Last dim of reference_points must be 2 or 4, but get {reference_points.shape[-1]} instead." ) output = self.ms_deformable_attn_core( value, spatial_shapes_list, sampling_locations, attention_weights, self.num_points_list, self.decoder_method, ) return output, attention_weights class DFineGate(nn.Module): def __init__(self, d_model: int): super().__init__() self.gate = nn.Linear(2 * d_model, 2 * d_model) self.norm = nn.LayerNorm(d_model) def forward(self, second_residual: torch.Tensor, hidden_states: torch.Tensor) -> torch.Tensor: gate_input = torch.cat([second_residual, hidden_states], dim=-1) gates = torch.sigmoid(self.gate(gate_input)) gate1, gate2 = gates.chunk(2, dim=-1) hidden_states = self.norm(gate1 * second_residual + gate2 * hidden_states) return hidden_states class DFineMultiheadAttention(nn.Module): """ Multi-headed attention from 'Attention Is All You Need' paper. Here, we add position embeddings to the queries and keys (as explained in the Deformable DETR paper). """ def __init__( self, embed_dim: int, num_heads: int, dropout: float = 0.0, bias: bool = True, ): super().__init__() self.embed_dim = embed_dim self.num_heads = num_heads self.dropout = dropout self.head_dim = embed_dim // num_heads if self.head_dim * num_heads != self.embed_dim: raise ValueError( f"embed_dim must be divisible by num_heads (got `embed_dim`: {self.embed_dim} and `num_heads`:" f" {num_heads})." ) self.scaling = self.head_dim**-0.5 self.k_proj = nn.Linear(embed_dim, embed_dim, bias=bias) self.v_proj = nn.Linear(embed_dim, embed_dim, bias=bias) self.q_proj = nn.Linear(embed_dim, embed_dim, bias=bias) self.out_proj = nn.Linear(embed_dim, embed_dim, bias=bias) def _reshape(self, tensor: torch.Tensor, seq_len: int, batch_size: int): return tensor.view(batch_size, seq_len, self.num_heads, self.head_dim).transpose(1, 2).contiguous() def with_pos_embed(self, tensor: torch.Tensor, position_embeddings: Optional[Tensor]): return tensor if position_embeddings is None else tensor + position_embeddings def forward( self, hidden_states: torch.Tensor, attention_mask: Optional[torch.Tensor] = None, position_embeddings: Optional[torch.Tensor] = None, output_attentions: bool = False, ) -> tuple[torch.Tensor, Optional[torch.Tensor], Optional[tuple[torch.Tensor]]]: """Input shape: Batch x Time x Channel""" batch_size, target_len, embed_dim = hidden_states.size() # add position embeddings to the hidden states before projecting to queries and keys if position_embeddings is not None: hidden_states_original = hidden_states hidden_states = self.with_pos_embed(hidden_states, position_embeddings) # get queries, keys and values query_states = self.q_proj(hidden_states) * self.scaling key_states = self._reshape(self.k_proj(hidden_states), -1, batch_size) value_states = self._reshape(self.v_proj(hidden_states_original), -1, batch_size) proj_shape = (batch_size * self.num_heads, -1, self.head_dim) query_states = self._reshape(query_states, target_len, batch_size).view(*proj_shape) key_states = key_states.view(*proj_shape) value_states = value_states.view(*proj_shape) source_len = key_states.size(1) attn_weights = torch.bmm(query_states, key_states.transpose(1, 2)) if attn_weights.size() != (batch_size * self.num_heads, target_len, source_len): raise ValueError( f"Attention weights should be of size {(batch_size * self.num_heads, target_len, source_len)}, but is" f" {attn_weights.size()}" ) # expand attention_mask if attention_mask is not None: # [seq_len, seq_len] -> [batch_size, 1, target_seq_len, source_seq_len] attention_mask = attention_mask.expand(batch_size, 1, *attention_mask.size()) if attention_mask is not None: if attention_mask.size() != (batch_size, 1, target_len, source_len): raise ValueError( f"Attention mask should be of size {(batch_size, 1, target_len, source_len)}, but is" f" {attention_mask.size()}" ) if attention_mask.dtype == torch.bool: attention_mask = torch.zeros_like(attention_mask, dtype=attn_weights.dtype).masked_fill_( attention_mask, -torch.inf ) attn_weights = attn_weights.view(batch_size, self.num_heads, target_len, source_len) + attention_mask attn_weights = attn_weights.view(batch_size * self.num_heads, target_len, source_len) attn_weights = nn.functional.softmax(attn_weights, dim=-1) if output_attentions: # this operation is a bit awkward, but it's required to # make sure that attn_weights keeps its gradient. # In order to do so, attn_weights have to reshaped # twice and have to be reused in the following attn_weights_reshaped = attn_weights.view(batch_size, self.num_heads, target_len, source_len) attn_weights = attn_weights_reshaped.view(batch_size * self.num_heads, target_len, source_len) else: attn_weights_reshaped = None attn_probs = nn.functional.dropout(attn_weights, p=self.dropout, training=self.training) attn_output = torch.bmm(attn_probs, value_states) if attn_output.size() != (batch_size * self.num_heads, target_len, self.head_dim): raise ValueError( f"`attn_output` should be of size {(batch_size, self.num_heads, target_len, self.head_dim)}, but is" f" {attn_output.size()}" ) attn_output = attn_output.view(batch_size, self.num_heads, target_len, self.head_dim) attn_output = attn_output.transpose(1, 2) attn_output = attn_output.reshape(batch_size, target_len, embed_dim) attn_output = self.out_proj(attn_output) return attn_output, attn_weights_reshaped class DFineDecoderLayer(nn.Module): def __init__(self, config: DFineConfig): super().__init__() # self-attention self.self_attn = DFineMultiheadAttention( embed_dim=config.d_model, num_heads=config.decoder_attention_heads, dropout=config.attention_dropout, ) self.dropout = config.dropout self.activation_fn = ACT2FN[config.decoder_activation_function] self.activation_dropout = config.activation_dropout self.self_attn_layer_norm = nn.LayerNorm(config.d_model, eps=config.layer_norm_eps) # override the encoder attention module with d-fine version self.encoder_attn = DFineMultiscaleDeformableAttention(config=config) # feedforward neural networks self.fc1 = nn.Linear(config.d_model, config.decoder_ffn_dim) self.fc2 = nn.Linear(config.decoder_ffn_dim, config.d_model) self.final_layer_norm = nn.LayerNorm(config.d_model, eps=config.layer_norm_eps) # gate self.gateway = DFineGate(config.d_model) def forward( self, hidden_states: torch.Tensor, position_embeddings: Optional[torch.Tensor] = None, reference_points=None, spatial_shapes=None, spatial_shapes_list=None, encoder_hidden_states: Optional[torch.Tensor] = None, encoder_attention_mask: Optional[torch.Tensor] = None, output_attentions: Optional[bool] = False, ) -> tuple[torch.Tensor, Any, Any]: """ Args: hidden_states (`torch.FloatTensor`): Input to the layer of shape `(seq_len, batch, embed_dim)`. position_embeddings (`torch.FloatTensor`, *optional*): Position embeddings that are added to the queries and keys in the self-attention layer. reference_points (`torch.FloatTensor`, *optional*): Reference points. spatial_shapes (`torch.LongTensor`, *optional*): Spatial shapes. level_start_index (`torch.LongTensor`, *optional*): Level start index. encoder_hidden_states (`torch.FloatTensor`): cross attention input to the layer of shape `(seq_len, batch, embed_dim)` encoder_attention_mask (`torch.FloatTensor`): encoder attention mask of size `(batch, 1, target_len, source_len)` where padding elements are indicated by very large negative values. output_attentions (`bool`, *optional*): Whether or not to return the attentions tensors of all attention layers. See `attentions` under returned tensors for more detail. """ # Self Attention hidden_states_2, self_attn_weights = self.self_attn( hidden_states=hidden_states, attention_mask=encoder_attention_mask, position_embeddings=position_embeddings, output_attentions=output_attentions, ) hidden_states_2 = nn.functional.dropout(hidden_states_2, p=self.dropout, training=self.training) hidden_states = hidden_states + hidden_states_2 hidden_states = self.self_attn_layer_norm(hidden_states) residual = hidden_states # Cross-Attention cross_attn_weights = None hidden_states = hidden_states if position_embeddings is None else hidden_states + position_embeddings hidden_states_2, cross_attn_weights = self.encoder_attn( hidden_states=hidden_states, encoder_hidden_states=encoder_hidden_states, reference_points=reference_points, spatial_shapes=spatial_shapes, spatial_shapes_list=spatial_shapes_list, ) hidden_states_2 = nn.functional.dropout(hidden_states_2, p=self.dropout, training=self.training) hidden_states = self.gateway(residual, hidden_states_2) # Fully Connected hidden_states_2 = self.activation_fn(self.fc1(hidden_states)) hidden_states_2 = nn.functional.dropout(hidden_states_2, p=self.activation_dropout, training=self.training) hidden_states_2 = self.fc2(hidden_states_2) hidden_states_2 = nn.functional.dropout(hidden_states_2, p=self.dropout, training=self.training) hidden_states = hidden_states + hidden_states_2 hidden_states = self.final_layer_norm(hidden_states.clamp(min=-65504, max=65504)) outputs = (hidden_states,) if output_attentions: outputs += (self_attn_weights, cross_attn_weights) return outputs @auto_docstring class DFinePreTrainedModel(PreTrainedModel): config: DFineConfig base_model_prefix = "d_fine" main_input_name = "pixel_values" _no_split_modules = [r"DFineHybridEncoder", r"DFineDecoderLayer"] def _init_weights(self, module): """Initialize the weights""" # initialize linear layer bias value according to a given probability value. if isinstance(module, (DFineForObjectDetection, DFineDecoder)): if module.class_embed is not None: for layer in module.class_embed: prior_prob = self.config.initializer_bias_prior_prob or 1 / (self.config.num_labels + 1) bias = float(-math.log((1 - prior_prob) / prior_prob)) nn.init.xavier_uniform_(layer.weight) nn.init.constant_(layer.bias, bias) if module.bbox_embed is not None: for layer in module.bbox_embed: nn.init.constant_(layer.layers[-1].weight, 0) nn.init.constant_(layer.layers[-1].bias, 0) if hasattr(module, "reg_scale"): module.reg_scale.fill_(self.config.reg_scale) if hasattr(module, "up"): module.up.fill_(self.config.up) if isinstance(module, DFineMultiscaleDeformableAttention): nn.init.constant_(module.sampling_offsets.weight.data, 0.0) default_dtype = torch.get_default_dtype() thetas = torch.arange(module.n_heads, dtype=torch.int64).to(default_dtype) * ( 2.0 * math.pi / module.n_heads ) grid_init = torch.stack([thetas.cos(), thetas.sin()], -1) grid_init = grid_init / grid_init.abs().max(-1, keepdim=True).values grid_init = grid_init.reshape(module.n_heads, 1, 2).tile([1, sum(module.num_points_list), 1]) scaling = torch.concat([torch.arange(1, n + 1) for n in module.num_points_list]).reshape(1, -1, 1) grid_init *= scaling with torch.no_grad(): module.sampling_offsets.bias.data[...] = grid_init.flatten() nn.init.constant_(module.attention_weights.weight.data, 0.0) nn.init.constant_(module.attention_weights.bias.data, 0.0) if isinstance(module, DFineModel): prior_prob = self.config.initializer_bias_prior_prob or 1 / (self.config.num_labels + 1) bias = float(-math.log((1 - prior_prob) / prior_prob)) nn.init.xavier_uniform_(module.enc_score_head.weight) nn.init.constant_(module.enc_score_head.bias, bias) if isinstance(module, (nn.Linear, nn.Conv2d, nn.BatchNorm2d)): module.weight.data.normal_(mean=0.0, std=self.config.initializer_range) if module.bias is not None: module.bias.data.zero_() if isinstance(module, DFineGate): bias = float(-math.log((1 - 0.5) / 0.5)) init.constant_(module.gate.bias, bias) init.constant_(module.gate.weight, 0) if isinstance(module, DFineLQE): init.constant_(module.reg_conf.layers[-1].bias, 0) init.constant_(module.reg_conf.layers[-1].weight, 0) if isinstance(module, nn.LayerNorm): module.weight.data.fill_(1.0) module.bias.data.zero_() if hasattr(module, "weight_embedding") and self.config.learn_initial_query: nn.init.xavier_uniform_(module.weight_embedding.weight) if hasattr(module, "denoising_class_embed") and self.config.num_denoising > 0: nn.init.xavier_uniform_(module.denoising_class_embed.weight) class DFineIntegral(nn.Module): """ A static layer that calculates integral results from a distribution. This layer computes the target location using the formula: `sum{Pr(n) * W(n)}`, where Pr(n) is the softmax probability vector representing the discrete distribution, and W(n) is the non-uniform Weighting Function. Args: max_num_bins (int): Max number of the discrete bins. Default is 32. It can be adjusted based on the dataset or task requirements. """ def __init__(self, config: DFineConfig): super().__init__() self.max_num_bins = config.max_num_bins def forward(self, pred_corners: torch.Tensor, project: torch.Tensor) -> torch.Tensor: batch_size, num_queries, _ = pred_corners.shape pred_corners = F.softmax(pred_corners.reshape(-1, self.max_num_bins + 1), dim=1) pred_corners = F.linear(pred_corners, project.to(pred_corners.device)).reshape(-1, 4) pred_corners = pred_corners.reshape(batch_size, num_queries, -1) return pred_corners @dataclass @auto_docstring( custom_intro=""" Base class for outputs of the DFineDecoder. This class adds two attributes to BaseModelOutputWithCrossAttentions, namely: - a stacked tensor of intermediate decoder hidden states (i.e. the output of each decoder layer) - a stacked tensor of intermediate reference points. """ ) class DFineDecoderOutput(ModelOutput): r""" intermediate_hidden_states (`torch.FloatTensor` of shape `(batch_size, config.decoder_layers, num_queries, hidden_size)`): Stacked intermediate hidden states (output of each layer of the decoder). intermediate_logits (`torch.FloatTensor` of shape `(batch_size, config.decoder_layers, sequence_length, config.num_labels)`): Stacked intermediate logits (logits of each layer of the decoder). intermediate_reference_points (`torch.FloatTensor` of shape `(batch_size, config.decoder_layers, sequence_length, hidden_size)`): Stacked intermediate reference points (reference points of each layer of the decoder). intermediate_predicted_corners (`torch.FloatTensor` of shape `(batch_size, config.decoder_layers, num_queries, 4)`): Stacked intermediate predicted corners (predicted corners of each layer of the decoder). initial_reference_points (`torch.FloatTensor` of shape `(batch_size, config.decoder_layers, num_queries, 4)`): Stacked initial reference points (initial reference points of each layer of the decoder). cross_attentions (`tuple(torch.FloatTensor)`, *optional*, returned when `output_attentions=True` and `config.add_cross_attention=True` is passed or when `config.output_attentions=True`): Tuple of `torch.FloatTensor` (one for each layer) of shape `(batch_size, num_heads, sequence_length, sequence_length)`. Attentions weights of the decoder's cross-attention layer, after the attention softmax, used to compute the weighted average in the cross-attention heads. """ last_hidden_state: Optional[torch.FloatTensor] = None intermediate_hidden_states: Optional[torch.FloatTensor] = None intermediate_logits: Optional[torch.FloatTensor] = None intermediate_reference_points: Optional[torch.FloatTensor] = None intermediate_predicted_corners: Optional[torch.FloatTensor] = None initial_reference_points: Optional[torch.FloatTensor] = None hidden_states: Optional[tuple[torch.FloatTensor]] = None attentions: Optional[tuple[torch.FloatTensor]] = None cross_attentions: Optional[tuple[torch.FloatTensor]] = None def inverse_sigmoid(x, eps=1e-5): x = x.clamp(min=0, max=1) x1 = x.clamp(min=eps) x2 = (1 - x).clamp(min=eps) return torch.log(x1 / x2) def weighting_function(max_num_bins: int, up: torch.Tensor, reg_scale: int) -> torch.Tensor: """ Generates the non-uniform Weighting Function W(n) for bounding box regression. Args: max_num_bins (int): Max number of the discrete bins. up (Tensor): Controls upper bounds of the sequence, where maximum offset is ±up * H / W. reg_scale (float): Controls the curvature of the Weighting Function. Larger values result in flatter weights near the central axis W(max_num_bins/2)=0 and steeper weights at both ends. Returns: Tensor: Sequence of Weighting Function. """ upper_bound1 = abs(up[0]) * abs(reg_scale) upper_bound2 = abs(up[0]) * abs(reg_scale) * 2 step = (upper_bound1 + 1) ** (2 / (max_num_bins - 2)) left_values = [-((step) ** i) + 1 for i in range(max_num_bins // 2 - 1, 0, -1)] right_values = [(step) ** i - 1 for i in range(1, max_num_bins // 2)] values = [-upper_bound2] + left_values + [torch.zeros_like(up[0][None])] + right_values + [upper_bound2] values = torch.cat(values, 0) return values def distance2bbox(points, distance: torch.Tensor, reg_scale: float) -> torch.Tensor: """ Decodes edge-distances into bounding box coordinates. Args: points (`torch.Tensor`): (batch_size, num_boxes, 4) or (num_boxes, 4) format, representing [x_center, y_center, width, height] distance (`torch.Tensor`): (batch_size, num_boxes, 4) or (num_boxes, 4), representing distances from the point to the left, top, right, and bottom boundaries. reg_scale (`float`): Controls the curvature of the Weighting Function. Returns: `torch.Tensor`: Bounding boxes in (batch_size, num_boxes, 4) or (num_boxes, 4) format, representing [x_center, y_center, width, height] """ reg_scale = abs(reg_scale) top_left_x = points[..., 0] - (0.5 * reg_scale + distance[..., 0]) * (points[..., 2] / reg_scale) top_left_y = points[..., 1] - (0.5 * reg_scale + distance[..., 1]) * (points[..., 3] / reg_scale) bottom_right_x = points[..., 0] + (0.5 * reg_scale + distance[..., 2]) * (points[..., 2] / reg_scale) bottom_right_y = points[..., 1] + (0.5 * reg_scale + distance[..., 3]) * (points[..., 3] / reg_scale) bboxes = torch.stack([top_left_x, top_left_y, bottom_right_x, bottom_right_y], -1) return corners_to_center_format(bboxes) class DFineDecoder(DFinePreTrainedModel): """ D-FINE Decoder implementing Fine-grained Distribution Refinement (FDR). This decoder refines object detection predictions through iterative updates across multiple layers, utilizing attention mechanisms, location quality estimators, and distribution refinement techniques to improve bounding box accuracy and robustness. """ def __init__(self, config: DFineConfig): super().__init__(config) self.eval_idx = config.eval_idx if config.eval_idx >= 0 else config.decoder_layers + config.eval_idx self.dropout = config.dropout self.layers = nn.ModuleList( [DFineDecoderLayer(config) for _ in range(config.decoder_layers)] + [DFineDecoderLayer(config) for _ in range(config.decoder_layers - self.eval_idx - 1)] ) self.query_pos_head = DFineMLPPredictionHead(config, 4, 2 * config.d_model, config.d_model, num_layers=2) # hack implementation for iterative bounding box refinement and two-stage Deformable DETR self.bbox_embed = None self.class_embed = None self.reg_scale = nn.Parameter(torch.tensor([config.reg_scale]), requires_grad=False) self.max_num_bins = config.max_num_bins self.d_model = config.d_model self.layer_scale = config.layer_scale self.pre_bbox_head = DFineMLP(config.hidden_size, config.hidden_size, 4, 3) self.integral = DFineIntegral(config) self.num_head = config.decoder_attention_heads self.up = nn.Parameter(torch.tensor([config.up]), requires_grad=False) self.lqe_layers = nn.ModuleList([DFineLQE(config) for _ in range(config.decoder_layers)]) # Initialize weights and apply final processing self.post_init() def forward( self, encoder_hidden_states: torch.Tensor, reference_points: torch.Tensor, inputs_embeds: torch.Tensor, spatial_shapes, level_start_index=None, spatial_shapes_list=None, output_hidden_states=None, encoder_attention_mask=None, memory_mask=None, output_attentions=None, return_dict=None, ) -> DFineDecoderOutput: r""" Args: inputs_embeds (`torch.FloatTensor` of shape `(batch_size, num_queries, hidden_size)`): The query embeddings that are passed into the decoder. encoder_hidden_states (`torch.FloatTensor` of shape `(batch_size, sequence_length, hidden_size)`, *optional*): Sequence of hidden-states at the output of the last layer of the encoder. Used in the cross-attention of the decoder. encoder_attention_mask (`torch.LongTensor` of shape `(batch_size, sequence_length)`, *optional*): Mask to avoid performing cross-attention on padding pixel_values of the encoder. Mask values selected in `[0, 1]`: - 1 for pixels that are real (i.e. **not masked**), - 0 for pixels that are padding (i.e. **masked**). position_embeddings (`torch.FloatTensor` of shape `(batch_size, num_queries, hidden_size)`, *optional*): Position embeddings that are added to the queries and keys in each self-attention layer. reference_points (`torch.FloatTensor` of shape `(batch_size, num_queries, 4)` is `as_two_stage` else `(batch_size, num_queries, 2)` or , *optional*): Reference point in range `[0, 1]`, top-left (0,0), bottom-right (1, 1), including padding area. spatial_shapes (`torch.FloatTensor` of shape `(num_feature_levels, 2)`): Spatial shapes of the feature maps. level_start_index (`torch.LongTensor` of shape `(num_feature_levels)`, *optional*): Indexes for the start of each feature level. In range `[0, sequence_length]`. valid_ratios (`torch.FloatTensor` of shape `(batch_size, num_feature_levels, 2)`, *optional*): Ratio of valid area in each feature level. output_attentions (`bool`, *optional*): Whether or not to return the attentions tensors of all attention layers. See `attentions` under returned tensors for more detail. output_hidden_states (`bool`, *optional*): Whether or not to return the hidden states of all layers. See `hidden_states` under returned tensors for more detail. return_dict (`bool`, *optional*): Whether or not to return a [`~file_utils.ModelOutput`] instead of a plain tuple. """ output_attentions = output_attentions if output_attentions is not None else self.config.output_attentions output_hidden_states = ( output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states ) return_dict = return_dict if return_dict is not None else self.config.use_return_dict if inputs_embeds is not None: hidden_states = inputs_embeds # decoder layers all_hidden_states = () if output_hidden_states else None all_self_attns = () if output_attentions else None all_cross_attentions = () if (output_attentions and encoder_hidden_states is not None) else None intermediate = () intermediate_reference_points = () intermediate_logits = () intermediate_predicted_corners = () initial_reference_points = () output_detach = pred_corners_undetach = 0 project = weighting_function(self.max_num_bins, self.up, self.reg_scale) ref_points_detach = F.sigmoid(reference_points) for i, decoder_layer in enumerate(self.layers): ref_points_input = ref_points_detach.unsqueeze(2) query_pos_embed = self.query_pos_head(ref_points_detach).clamp(min=-10, max=10) if output_hidden_states: all_hidden_states += (hidden_states,) output = decoder_layer( hidden_states=hidden_states, position_embeddings=query_pos_embed, reference_points=ref_points_input, spatial_shapes=spatial_shapes, spatial_shapes_list=spatial_shapes_list, encoder_hidden_states=encoder_hidden_states, encoder_attention_mask=encoder_attention_mask, output_attentions=output_attentions, ) hidden_states = output[0] if i == 0: # Initial bounding box predictions with inverse sigmoid refinement new_reference_points = F.sigmoid(self.pre_bbox_head(output[0]) + inverse_sigmoid(ref_points_detach)) ref_points_initial = new_reference_points.detach() # Refine bounding box corners using FDR, integrating previous layer's corrections if self.bbox_embed is not None: pred_corners = self.bbox_embed[i](hidden_states + output_detach) + pred_corners_undetach inter_ref_bbox = distance2bbox( ref_points_initial, self.integral(pred_corners, project), self.reg_scale ) pred_corners_undetach = pred_corners ref_points_detach = inter_ref_bbox.detach() output_detach = hidden_states.detach() intermediate += (hidden_states,) if self.class_embed is not None and (self.training or i == self.eval_idx): scores = self.class_embed[i](hidden_states) # Add initial logits and reference points with pre-bbox head if i == 0: intermediate_logits += (scores,) intermediate_reference_points += (new_reference_points,) # Lqe does not affect the performance here. scores = self.lqe_layers[i](scores, pred_corners) intermediate_logits += (scores,) intermediate_reference_points += (inter_ref_bbox,) initial_reference_points += (ref_points_initial,) intermediate_predicted_corners += (pred_corners,) if output_attentions: all_self_attns += (output[1],) if encoder_hidden_states is not None: all_cross_attentions += (output[2],) # Keep batch_size as first dimension intermediate = torch.stack(intermediate) if self.class_embed is not None and self.bbox_embed is not None: intermediate_logits = torch.stack(intermediate_logits, dim=1) intermediate_predicted_corners = torch.stack(intermediate_predicted_corners, dim=1) initial_reference_points = torch.stack(initial_reference_points, dim=1) intermediate_reference_points = torch.stack(intermediate_reference_points, dim=1) # add hidden states from the last decoder layer if output_hidden_states: all_hidden_states += (hidden_states,) if not return_dict: return tuple( v for v in [ hidden_states, intermediate, intermediate_logits, intermediate_reference_points, intermediate_predicted_corners, initial_reference_points, all_hidden_states, all_self_attns, all_cross_attentions, ] if v is not None ) return DFineDecoderOutput( last_hidden_state=hidden_states, intermediate_hidden_states=intermediate, intermediate_logits=intermediate_logits, intermediate_reference_points=intermediate_reference_points, intermediate_predicted_corners=intermediate_predicted_corners, initial_reference_points=initial_reference_points, hidden_states=all_hidden_states, attentions=all_self_attns, cross_attentions=all_cross_attentions, ) @dataclass @auto_docstring( custom_intro=""" Base class for outputs of the RT-DETR encoder-decoder model. """ ) class DFineModelOutput(ModelOutput): r""" last_hidden_state (`torch.FloatTensor` of shape `(batch_size, num_queries, hidden_size)`): Sequence of hidden-states at the output of the last layer of the decoder of the model. intermediate_hidden_states (`torch.FloatTensor` of shape `(batch_size, config.decoder_layers, num_queries, hidden_size)`): Stacked intermediate hidden states (output of each layer of the decoder). intermediate_logits (`torch.FloatTensor` of shape `(batch_size, config.decoder_layers, sequence_length, config.num_labels)`): Stacked intermediate logits (logits of each layer of the decoder). intermediate_reference_points (`torch.FloatTensor` of shape `(batch_size, config.decoder_layers, num_queries, 4)`): Stacked intermediate reference points (reference points of each layer of the decoder). intermediate_predicted_corners (`torch.FloatTensor` of shape `(batch_size, config.decoder_layers, num_queries, 4)`): Stacked intermediate predicted corners (predicted corners of each layer of the decoder). initial_reference_points (`torch.FloatTensor` of shape `(batch_size, num_queries, 4)`): Initial reference points used for the first decoder layer. init_reference_points (`torch.FloatTensor` of shape `(batch_size, num_queries, 4)`): Initial reference points sent through the Transformer decoder. enc_topk_logits (`torch.FloatTensor` of shape `(batch_size, sequence_length, config.num_labels)`): Predicted bounding boxes scores where the top `config.two_stage_num_proposals` scoring bounding boxes are picked as region proposals in the encoder stage. Output of bounding box binary classification (i.e. foreground and background). enc_topk_bboxes (`torch.FloatTensor` of shape `(batch_size, sequence_length, 4)`): Logits of predicted bounding boxes coordinates in the encoder stage. enc_outputs_class (`torch.FloatTensor` of shape `(batch_size, sequence_length, config.num_labels)`, *optional*, returned when `config.with_box_refine=True` and `config.two_stage=True`): Predicted bounding boxes scores where the top `config.two_stage_num_proposals` scoring bounding boxes are picked as region proposals in the first stage. Output of bounding box binary classification (i.e. foreground and background). enc_outputs_coord_logits (`torch.FloatTensor` of shape `(batch_size, sequence_length, 4)`, *optional*, returned when `config.with_box_refine=True` and `config.two_stage=True`): Logits of predicted bounding boxes coordinates in the first stage. denoising_meta_values (`dict`): Extra dictionary for the denoising related values. """ last_hidden_state: Optional[torch.FloatTensor] = None intermediate_hidden_states: Optional[torch.FloatTensor] = None intermediate_logits: Optional[torch.FloatTensor] = None intermediate_reference_points: Optional[torch.FloatTensor] = None intermediate_predicted_corners: Optional[torch.FloatTensor] = None initial_reference_points: Optional[torch.FloatTensor] = None decoder_hidden_states: Optional[tuple[torch.FloatTensor]] = None decoder_attentions: Optional[tuple[torch.FloatTensor]] = None cross_attentions: Optional[tuple[torch.FloatTensor]] = None encoder_last_hidden_state: Optional[torch.FloatTensor] = None encoder_hidden_states: Optional[tuple[torch.FloatTensor]] = None encoder_attentions: Optional[tuple[torch.FloatTensor]] = None init_reference_points: Optional[torch.FloatTensor] = None enc_topk_logits: Optional[torch.FloatTensor] = None enc_topk_bboxes: Optional[torch.FloatTensor] = None enc_outputs_class: Optional[torch.FloatTensor] = None enc_outputs_coord_logits: Optional[torch.FloatTensor] = None denoising_meta_values: Optional[dict] = None class DFineFrozenBatchNorm2d(nn.Module): """ BatchNorm2d where the batch statistics and the affine parameters are fixed. Copy-paste from torchvision.misc.ops with added eps before rqsrt, without which any other models than torchvision.models.resnet[18,34,50,101] produce nans. """ def __init__(self, n): super().__init__() self.register_buffer("weight", torch.ones(n)) self.register_buffer("bias", torch.zeros(n)) self.register_buffer("running_mean", torch.zeros(n)) self.register_buffer("running_var", torch.ones(n)) def _load_from_state_dict( self, state_dict, prefix, local_metadata, strict, missing_keys, unexpected_keys, error_msgs ): num_batches_tracked_key = prefix + "num_batches_tracked" if num_batches_tracked_key in state_dict: del state_dict[num_batches_tracked_key] super()._load_from_state_dict( state_dict, prefix, local_metadata, strict, missing_keys, unexpected_keys, error_msgs ) def forward(self, x): # move reshapes to the beginning # to make it user-friendly weight = self.weight.reshape(1, -1, 1, 1) bias = self.bias.reshape(1, -1, 1, 1) running_var = self.running_var.reshape(1, -1, 1, 1) running_mean = self.running_mean.reshape(1, -1, 1, 1) epsilon = 1e-5 scale = weight * (running_var + epsilon).rsqrt() bias = bias - running_mean * scale return x * scale + bias def replace_batch_norm(model): r""" Recursively replace all `torch.nn.BatchNorm2d` with `DFineFrozenBatchNorm2d`. Args: model (torch.nn.Module): input model """ for name, module in model.named_children(): if isinstance(module, nn.BatchNorm2d): new_module = DFineFrozenBatchNorm2d(module.num_features) if module.weight.device != torch.device("meta"): new_module.weight.data.copy_(module.weight) new_module.bias.data.copy_(module.bias) new_module.running_mean.data.copy_(module.running_mean) new_module.running_var.data.copy_(module.running_var) model._modules[name] = new_module if len(list(module.children())) > 0: replace_batch_norm(module) class DFineConvEncoder(nn.Module): """ Convolutional backbone using the modeling_d_fine_resnet.py. nn.BatchNorm2d layers are replaced by DFineFrozenBatchNorm2d as defined above. https://github.com/lyuwenyu/RT-DETR/blob/main/DFine_pytorch/src/nn/backbone/presnet.py#L142 """ def __init__(self, config): super().__init__() backbone = load_backbone(config) if config.freeze_backbone_batch_norms: # replace batch norm by frozen batch norm with torch.no_grad(): replace_batch_norm(backbone) self.model = backbone self.intermediate_channel_sizes = self.model.channels def forward(self, pixel_values: torch.Tensor, pixel_mask: torch.Tensor): # send pixel_values through the model to get list of feature maps features = self.model(pixel_values).feature_maps out = [] for feature_map in features: # downsample pixel_mask to match shape of corresponding feature_map mask = nn.functional.interpolate(pixel_mask[None].float(), size=feature_map.shape[-2:]).to(torch.bool)[0] out.append((feature_map, mask)) return out def get_contrastive_denoising_training_group( targets, num_classes, num_queries, class_embed, num_denoising_queries=100, label_noise_ratio=0.5, box_noise_scale=1.0, ): """ Creates a contrastive denoising training group using ground-truth samples. It adds noise to labels and boxes. Args: targets (`list[dict]`): The target objects, each containing 'class_labels' and 'boxes' for objects in an image. num_classes (`int`): Total number of classes in the dataset. num_queries (`int`): Number of query slots in the transformer. class_embed (`callable`): A function or a model layer to embed class labels. num_denoising_queries (`int`, *optional*, defaults to 100): Number of denoising queries. label_noise_ratio (`float`, *optional*, defaults to 0.5): Ratio of noise applied to labels. box_noise_scale (`float`, *optional*, defaults to 1.0): Scale of noise applied to bounding boxes. Returns: `tuple` comprising various elements: - **input_query_class** (`torch.FloatTensor`) -- Class queries with applied label noise. - **input_query_bbox** (`torch.FloatTensor`) -- Bounding box queries with applied box noise. - **attn_mask** (`torch.FloatTensor`) -- Attention mask for separating denoising and reconstruction queries. - **denoising_meta_values** (`dict`) -- Metadata including denoising positive indices, number of groups, and split sizes. """ if num_denoising_queries <= 0: return None, None, None, None num_ground_truths = [len(t["class_labels"]) for t in targets] device = targets[0]["class_labels"].device max_gt_num = max(num_ground_truths) if max_gt_num == 0: return None, None, None, None num_groups_denoising_queries = num_denoising_queries // max_gt_num num_groups_denoising_queries = 1 if num_groups_denoising_queries == 0 else num_groups_denoising_queries # pad gt to max_num of a batch batch_size = len(num_ground_truths) input_query_class = torch.full([batch_size, max_gt_num], num_classes, dtype=torch.int32, device=device) input_query_bbox = torch.zeros([batch_size, max_gt_num, 4], device=device) pad_gt_mask = torch.zeros([batch_size, max_gt_num], dtype=torch.bool, device=device) for i in range(batch_size): num_gt = num_ground_truths[i] if num_gt > 0: input_query_class[i, :num_gt] = targets[i]["class_labels"] input_query_bbox[i, :num_gt] = targets[i]["boxes"] pad_gt_mask[i, :num_gt] = 1 # each group has positive and negative queries. input_query_class = input_query_class.tile([1, 2 * num_groups_denoising_queries]) input_query_bbox = input_query_bbox.tile([1, 2 * num_groups_denoising_queries, 1]) pad_gt_mask = pad_gt_mask.tile([1, 2 * num_groups_denoising_queries]) # positive and negative mask negative_gt_mask = torch.zeros([batch_size, max_gt_num * 2, 1], device=device) negative_gt_mask[:, max_gt_num:] = 1 negative_gt_mask = negative_gt_mask.tile([1, num_groups_denoising_queries, 1]) positive_gt_mask = 1 - negative_gt_mask # contrastive denoising training positive index positive_gt_mask = positive_gt_mask.squeeze(-1) * pad_gt_mask denoise_positive_idx = torch.nonzero(positive_gt_mask)[:, 1] denoise_positive_idx = torch.split( denoise_positive_idx, [n * num_groups_denoising_queries for n in num_ground_truths] ) # total denoising queries num_denoising_queries = torch_int(max_gt_num * 2 * num_groups_denoising_queries) if label_noise_ratio > 0: mask = torch.rand_like(input_query_class, dtype=torch.float) < (label_noise_ratio * 0.5) # randomly put a new one here new_label = torch.randint_like(mask, 0, num_classes, dtype=input_query_class.dtype) input_query_class = torch.where(mask & pad_gt_mask, new_label, input_query_class) if box_noise_scale > 0: known_bbox = center_to_corners_format(input_query_bbox) diff = torch.tile(input_query_bbox[..., 2:] * 0.5, [1, 1, 2]) * box_noise_scale rand_sign = torch.randint_like(input_query_bbox, 0, 2) * 2.0 - 1.0 rand_part = torch.rand_like(input_query_bbox) rand_part = (rand_part + 1.0) * negative_gt_mask + rand_part * (1 - negative_gt_mask) rand_part *= rand_sign known_bbox += rand_part * diff known_bbox.clip_(min=0.0, max=1.0) input_query_bbox = corners_to_center_format(known_bbox) input_query_bbox = inverse_sigmoid(input_query_bbox) input_query_class = class_embed(input_query_class) target_size = num_denoising_queries + num_queries attn_mask = torch.full([target_size, target_size], 0, dtype=torch.float, device=device) # match query cannot see the reconstruction attn_mask[num_denoising_queries:, :num_denoising_queries] = -torch.inf # reconstructions cannot see each other for i in range(num_groups_denoising_queries): idx_block_start = max_gt_num * 2 * i idx_block_end = max_gt_num * 2 * (i + 1) attn_mask[idx_block_start:idx_block_end, :idx_block_start] = -torch.inf attn_mask[idx_block_start:idx_block_end, idx_block_end:num_denoising_queries] = -torch.inf denoising_meta_values = { "dn_positive_idx": denoise_positive_idx, "dn_num_group": num_groups_denoising_queries, "dn_num_split": [num_denoising_queries, num_queries], } return input_query_class, input_query_bbox, attn_mask, denoising_meta_values @auto_docstring( custom_intro=""" RT-DETR Model (consisting of a backbone and encoder-decoder) outputting raw hidden states without any head on top. """ ) class DFineModel(DFinePreTrainedModel): def __init__(self, config: DFineConfig): super().__init__(config) # Create backbone self.backbone = DFineConvEncoder(config) intermediate_channel_sizes = self.backbone.intermediate_channel_sizes num_backbone_outs = len(config.decoder_in_channels) encoder_input_proj_list = [] for _ in range(num_backbone_outs): in_channels = intermediate_channel_sizes[_] encoder_input_proj_list.append( nn.Sequential( nn.Conv2d(in_channels, config.encoder_hidden_dim, kernel_size=1, bias=False), nn.BatchNorm2d(config.encoder_hidden_dim), ) ) self.encoder_input_proj = nn.ModuleList(encoder_input_proj_list) self.encoder = DFineHybridEncoder(config=config) # denoising part if config.num_denoising > 0: self.denoising_class_embed = nn.Embedding( config.num_labels + 1, config.d_model, padding_idx=config.num_labels ) # decoder embedding if config.learn_initial_query: self.weight_embedding = nn.Embedding(config.num_queries, config.d_model) # encoder head self.enc_output = nn.Sequential( nn.Linear(config.d_model, config.d_model), nn.LayerNorm(config.d_model, eps=config.layer_norm_eps), ) self.enc_score_head = nn.Linear(config.d_model, config.num_labels) self.enc_bbox_head = DFineMLPPredictionHead(config, config.d_model, config.d_model, 4, num_layers=3) # init encoder output anchors and valid_mask if config.anchor_image_size: self.anchors, self.valid_mask = self.generate_anchors(dtype=self.dtype) num_backbone_outs = len(config.decoder_in_channels) decoder_input_proj_list = [] for _ in range(num_backbone_outs): in_channels = config.decoder_in_channels[_] decoder_input_proj_list.append( nn.Sequential( nn.Conv2d(in_channels, config.d_model, kernel_size=1, bias=False), nn.BatchNorm2d(config.d_model, config.batch_norm_eps), ) ) for _ in range(config.num_feature_levels - num_backbone_outs): decoder_input_proj_list.append( nn.Sequential( nn.Conv2d(in_channels, config.d_model, kernel_size=3, stride=2, padding=1, bias=False), nn.BatchNorm2d(config.d_model, config.batch_norm_eps), ) ) in_channels = config.d_model self.decoder = DFineDecoder(config) decoder_input_proj = [] in_channels = config.decoder_in_channels[-1] for _ in range(num_backbone_outs): if config.hidden_size == config.decoder_in_channels[-1]: decoder_input_proj.append(nn.Identity()) else: conv = nn.Conv2d(in_channels, config.d_model, kernel_size=1, bias=False) batchnorm = nn.BatchNorm2d(config.d_model, config.batch_norm_eps) decoder_input_proj.append(nn.Sequential(conv, batchnorm)) for _ in range(config.num_feature_levels - num_backbone_outs): if config.hidden_size == config.decoder_in_channels[-1]: decoder_input_proj.append(nn.Identity()) else: conv = nn.Conv2d(in_channels, config.d_model, kernel_size=3, stride=2, padding=1, bias=False) batchnorm = nn.BatchNorm2d(config.d_model, config.batch_norm_eps) decoder_input_proj.append(nn.Sequential(conv, batchnorm)) self.decoder_input_proj = nn.ModuleList(decoder_input_proj) self.post_init() def get_encoder(self): return self.encoder def freeze_backbone(self): for param in self.backbone.parameters(): param.requires_grad_(False) def unfreeze_backbone(self): for param in self.backbone.parameters(): param.requires_grad_(True) @compile_compatible_method_lru_cache(maxsize=32) def generate_anchors(self, spatial_shapes=None, grid_size=0.05, device="cpu", dtype=torch.float32): if spatial_shapes is None: spatial_shapes = [ [int(self.config.anchor_image_size[0] / s), int(self.config.anchor_image_size[1] / s)] for s in self.config.feat_strides ] anchors = [] for level, (height, width) in enumerate(spatial_shapes): grid_y, grid_x = torch.meshgrid( torch.arange(end=height, device=device).to(dtype), torch.arange(end=width, device=device).to(dtype), indexing="ij", ) grid_xy = torch.stack([grid_x, grid_y], -1) grid_xy = grid_xy.unsqueeze(0) + 0.5 grid_xy[..., 0] /= width grid_xy[..., 1] /= height wh = torch.ones_like(grid_xy) * grid_size * (2.0**level) anchors.append(torch.concat([grid_xy, wh], -1).reshape(-1, height * width, 4)) # define the valid range for anchor coordinates eps = 1e-2 anchors = torch.concat(anchors, 1) valid_mask = ((anchors > eps) * (anchors < 1 - eps)).all(-1, keepdim=True) anchors = torch.log(anchors / (1 - anchors)) anchors = torch.where(valid_mask, anchors, torch.tensor(torch.finfo(dtype).max, dtype=dtype, device=device)) return anchors, valid_mask @auto_docstring def forward( self, pixel_values: torch.FloatTensor, pixel_mask: Optional[torch.LongTensor] = None, encoder_outputs: Optional[torch.FloatTensor] = None, inputs_embeds: Optional[torch.FloatTensor] = None, decoder_inputs_embeds: Optional[torch.FloatTensor] = None, labels: Optional[list[dict]] = None, output_attentions: Optional[bool] = None, output_hidden_states: Optional[bool] = None, return_dict: Optional[bool] = None, ) -> Union[tuple[torch.FloatTensor], DFineModelOutput]: r""" inputs_embeds (`torch.FloatTensor` of shape `(batch_size, sequence_length, hidden_size)`, *optional*): Optionally, instead of passing the flattened feature map (output of the backbone + projection layer), you can choose to directly pass a flattened representation of an image. decoder_inputs_embeds (`torch.FloatTensor` of shape `(batch_size, num_queries, hidden_size)`, *optional*): Optionally, instead of initializing the queries with a tensor of zeros, you can choose to directly pass an embedded representation. labels (`list[Dict]` of len `(batch_size,)`, *optional*): Labels for computing the bipartite matching loss. List of dicts, each dictionary containing at least the following 2 keys: 'class_labels' and 'boxes' (the class labels and bounding boxes of an image in the batch respectively). The class labels themselves should be a `torch.LongTensor` of len `(number of bounding boxes in the image,)` and the boxes a `torch.FloatTensor` of shape `(number of bounding boxes in the image, 4)`. Examples: ```python >>> from transformers import AutoImageProcessor, DFineModel >>> from PIL import Image >>> import requests >>> url = "http://images.cocodataset.org/val2017/000000039769.jpg" >>> image = Image.open(requests.get(url, stream=True).raw) >>> image_processor = AutoImageProcessor.from_pretrained("PekingU/DFine_r50vd") >>> model = DFineModel.from_pretrained("PekingU/DFine_r50vd") >>> inputs = image_processor(images=image, return_tensors="pt") >>> outputs = model(**inputs) >>> last_hidden_states = outputs.last_hidden_state >>> list(last_hidden_states.shape) [1, 300, 256] ```""" output_attentions = output_attentions if output_attentions is not None else self.config.output_attentions output_hidden_states = ( output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states ) return_dict = return_dict if return_dict is not None else self.config.use_return_dict batch_size, num_channels, height, width = pixel_values.shape device = pixel_values.device if pixel_mask is None: pixel_mask = torch.ones(((batch_size, height, width)), device=device) features = self.backbone(pixel_values, pixel_mask) proj_feats = [self.encoder_input_proj[level](source) for level, (source, mask) in enumerate(features)] if encoder_outputs is None: encoder_outputs = self.encoder( proj_feats, output_attentions=output_attentions, output_hidden_states=output_hidden_states, return_dict=return_dict, ) # If the user passed a tuple for encoder_outputs, we wrap it in a BaseModelOutput when return_dict=True elif return_dict and not isinstance(encoder_outputs, BaseModelOutput): encoder_outputs = BaseModelOutput( last_hidden_state=encoder_outputs[0], hidden_states=encoder_outputs[1] if output_hidden_states else None, attentions=encoder_outputs[2] if len(encoder_outputs) > 2 else encoder_outputs[1] if output_attentions else None, ) # Equivalent to def _get_encoder_input # https://github.com/lyuwenyu/RT-DETR/blob/94f5e16708329d2f2716426868ec89aa774af016/DFine_pytorch/src/zoo/DFine/DFine_decoder.py#L412 sources = [] for level, source in enumerate(encoder_outputs[0]): sources.append(self.decoder_input_proj[level](source)) # Lowest resolution feature maps are obtained via 3x3 stride 2 convolutions on the final stage if self.config.num_feature_levels > len(sources): _len_sources = len(sources) sources.append(self.decoder_input_proj[_len_sources](encoder_outputs[0])[-1]) for i in range(_len_sources + 1, self.config.num_feature_levels): sources.append(self.decoder_input_proj[i](encoder_outputs[0][-1])) # Prepare encoder inputs (by flattening) source_flatten = [] spatial_shapes_list = [] spatial_shapes = torch.empty((len(sources), 2), device=device, dtype=torch.long) for level, source in enumerate(sources): height, width = source.shape[-2:] spatial_shapes[level, 0] = height spatial_shapes[level, 1] = width spatial_shapes_list.append((height, width)) source = source.flatten(2).transpose(1, 2) source_flatten.append(source) source_flatten = torch.cat(source_flatten, 1) level_start_index = torch.cat((spatial_shapes.new_zeros((1,)), spatial_shapes.prod(1).cumsum(0)[:-1])) # prepare denoising training if self.training and self.config.num_denoising > 0 and labels is not None: ( denoising_class, denoising_bbox_unact, attention_mask, denoising_meta_values, ) = get_contrastive_denoising_training_group( targets=labels, num_classes=self.config.num_labels, num_queries=self.config.num_queries, class_embed=self.denoising_class_embed, num_denoising_queries=self.config.num_denoising, label_noise_ratio=self.config.label_noise_ratio, box_noise_scale=self.config.box_noise_scale, ) else: denoising_class, denoising_bbox_unact, attention_mask, denoising_meta_values = None, None, None, None batch_size = len(source_flatten) device = source_flatten.device dtype = source_flatten.dtype # prepare input for decoder if self.training or self.config.anchor_image_size is None: # Pass spatial_shapes as tuple to make it hashable and make sure # lru_cache is working for generate_anchors() spatial_shapes_tuple = tuple(spatial_shapes_list) anchors, valid_mask = self.generate_anchors(spatial_shapes_tuple, device=device, dtype=dtype) else: anchors, valid_mask = self.anchors, self.valid_mask anchors, valid_mask = anchors.to(device, dtype), valid_mask.to(device, dtype) # use the valid_mask to selectively retain values in the feature map where the mask is `True` memory = valid_mask.to(source_flatten.dtype) * source_flatten output_memory = self.enc_output(memory) enc_outputs_class = self.enc_score_head(output_memory) enc_outputs_coord_logits = self.enc_bbox_head(output_memory) + anchors _, topk_ind = torch.topk(enc_outputs_class.max(-1).values, self.config.num_queries, dim=1) reference_points_unact = enc_outputs_coord_logits.gather( dim=1, index=topk_ind.unsqueeze(-1).repeat(1, 1, enc_outputs_coord_logits.shape[-1]) ) enc_topk_bboxes = F.sigmoid(reference_points_unact) if denoising_bbox_unact is not None: reference_points_unact = torch.concat([denoising_bbox_unact, reference_points_unact], 1) enc_topk_logits = enc_outputs_class.gather( dim=1, index=topk_ind.unsqueeze(-1).repeat(1, 1, enc_outputs_class.shape[-1]) ) # extract region features if self.config.learn_initial_query: target = self.weight_embedding.tile([batch_size, 1, 1]) else: target = output_memory.gather(dim=1, index=topk_ind.unsqueeze(-1).repeat(1, 1, output_memory.shape[-1])) target = target.detach() if denoising_class is not None: target = torch.concat([denoising_class, target], 1) init_reference_points = reference_points_unact.detach() # decoder decoder_outputs = self.decoder( inputs_embeds=target, encoder_hidden_states=source_flatten, encoder_attention_mask=attention_mask, reference_points=init_reference_points, spatial_shapes=spatial_shapes, spatial_shapes_list=spatial_shapes_list, level_start_index=level_start_index, output_attentions=output_attentions, output_hidden_states=output_hidden_states, return_dict=return_dict, ) if not return_dict: enc_outputs = tuple( value for value in [enc_topk_logits, enc_topk_bboxes, enc_outputs_class, enc_outputs_coord_logits] if value is not None ) dn_outputs = tuple(value if value is not None else None for value in [denoising_meta_values]) tuple_outputs = decoder_outputs + encoder_outputs + (init_reference_points,) + enc_outputs + dn_outputs return tuple_outputs return DFineModelOutput( last_hidden_state=decoder_outputs.last_hidden_state, intermediate_hidden_states=decoder_outputs.intermediate_hidden_states, intermediate_logits=decoder_outputs.intermediate_logits, intermediate_reference_points=decoder_outputs.intermediate_reference_points, intermediate_predicted_corners=decoder_outputs.intermediate_predicted_corners, initial_reference_points=decoder_outputs.initial_reference_points, decoder_hidden_states=decoder_outputs.hidden_states, decoder_attentions=decoder_outputs.attentions, cross_attentions=decoder_outputs.cross_attentions, encoder_last_hidden_state=encoder_outputs.last_hidden_state, encoder_hidden_states=encoder_outputs.hidden_states, encoder_attentions=encoder_outputs.attentions, init_reference_points=init_reference_points, enc_topk_logits=enc_topk_logits, enc_topk_bboxes=enc_topk_bboxes, enc_outputs_class=enc_outputs_class, enc_outputs_coord_logits=enc_outputs_coord_logits, denoising_meta_values=denoising_meta_values, ) @dataclass @auto_docstring( custom_intro=""" Output type of [`DFineForObjectDetection`]. """ ) class DFineObjectDetectionOutput(ModelOutput): r""" loss (`torch.FloatTensor` of shape `(1,)`, *optional*, returned when `labels` are provided)): Total loss as a linear combination of a negative log-likehood (cross-entropy) for class prediction and a bounding box loss. The latter is defined as a linear combination of the L1 loss and the generalized scale-invariant IoU loss. loss_dict (`Dict`, *optional*): A dictionary containing the individual losses. Useful for logging. logits (`torch.FloatTensor` of shape `(batch_size, num_queries, num_classes + 1)`): Classification logits (including no-object) for all queries. pred_boxes (`torch.FloatTensor` of shape `(batch_size, num_queries, 4)`): Normalized boxes coordinates for all queries, represented as (center_x, center_y, width, height). These values are normalized in [0, 1], relative to the size of each individual image in the batch (disregarding possible padding). You can use [`~DFineImageProcessor.post_process_object_detection`] to retrieve the unnormalized (absolute) bounding boxes. auxiliary_outputs (`list[Dict]`, *optional*): Optional, only returned when auxiliary losses are activated (i.e. `config.auxiliary_loss` is set to `True`) and labels are provided. It is a list of dictionaries containing the two above keys (`logits` and `pred_boxes`) for each decoder layer. last_hidden_state (`torch.FloatTensor` of shape `(batch_size, num_queries, hidden_size)`): Sequence of hidden-states at the output of the last layer of the decoder of the model. intermediate_hidden_states (`torch.FloatTensor` of shape `(batch_size, config.decoder_layers, num_queries, hidden_size)`): Stacked intermediate hidden states (output of each layer of the decoder). intermediate_logits (`torch.FloatTensor` of shape `(batch_size, config.decoder_layers, num_queries, config.num_labels)`): Stacked intermediate logits (logits of each layer of the decoder). intermediate_reference_points (`torch.FloatTensor` of shape `(batch_size, config.decoder_layers, num_queries, 4)`): Stacked intermediate reference points (reference points of each layer of the decoder). intermediate_predicted_corners (`torch.FloatTensor` of shape `(batch_size, config.decoder_layers, num_queries, 4)`): Stacked intermediate predicted corners (predicted corners of each layer of the decoder). initial_reference_points (`torch.FloatTensor` of shape `(batch_size, config.decoder_layers, num_queries, 4)`): Stacked initial reference points (initial reference points of each layer of the decoder). init_reference_points (`torch.FloatTensor` of shape `(batch_size, num_queries, 4)`): Initial reference points sent through the Transformer decoder. enc_topk_logits (`torch.FloatTensor` of shape `(batch_size, sequence_length, config.num_labels)`, *optional*, returned when `config.with_box_refine=True` and `config.two_stage=True`): Logits of predicted bounding boxes coordinates in the encoder. enc_topk_bboxes (`torch.FloatTensor` of shape `(batch_size, sequence_length, 4)`, *optional*, returned when `config.with_box_refine=True` and `config.two_stage=True`): Logits of predicted bounding boxes coordinates in the encoder. enc_outputs_class (`torch.FloatTensor` of shape `(batch_size, sequence_length, config.num_labels)`, *optional*, returned when `config.with_box_refine=True` and `config.two_stage=True`): Predicted bounding boxes scores where the top `config.two_stage_num_proposals` scoring bounding boxes are picked as region proposals in the first stage. Output of bounding box binary classification (i.e. foreground and background). enc_outputs_coord_logits (`torch.FloatTensor` of shape `(batch_size, sequence_length, 4)`, *optional*, returned when `config.with_box_refine=True` and `config.two_stage=True`): Logits of predicted bounding boxes coordinates in the first stage. denoising_meta_values (`dict`): Extra dictionary for the denoising related values """ loss: Optional[torch.FloatTensor] = None loss_dict: Optional[dict] = None logits: Optional[torch.FloatTensor] = None pred_boxes: Optional[torch.FloatTensor] = None auxiliary_outputs: Optional[list[dict]] = None last_hidden_state: Optional[torch.FloatTensor] = None intermediate_hidden_states: Optional[torch.FloatTensor] = None intermediate_logits: Optional[torch.FloatTensor] = None intermediate_reference_points: Optional[torch.FloatTensor] = None intermediate_predicted_corners: Optional[torch.FloatTensor] = None initial_reference_points: Optional[torch.FloatTensor] = None decoder_hidden_states: Optional[tuple[torch.FloatTensor]] = None decoder_attentions: Optional[tuple[torch.FloatTensor]] = None cross_attentions: Optional[tuple[torch.FloatTensor]] = None encoder_last_hidden_state: Optional[torch.FloatTensor] = None encoder_hidden_states: Optional[tuple[torch.FloatTensor]] = None encoder_attentions: Optional[tuple[torch.FloatTensor]] = None init_reference_points: Optional[tuple[torch.FloatTensor]] = None enc_topk_logits: Optional[torch.FloatTensor] = None enc_topk_bboxes: Optional[torch.FloatTensor] = None enc_outputs_class: Optional[torch.FloatTensor] = None enc_outputs_coord_logits: Optional[torch.FloatTensor] = None denoising_meta_values: Optional[dict] = None @auto_docstring( custom_intro=""" RT-DETR Model (consisting of a backbone and encoder-decoder) outputting bounding boxes and logits to be further decoded into scores and classes. """ ) class DFineForObjectDetection(DFinePreTrainedModel): # When using clones, all layers > 0 will be clones, but layer 0 *is* required _tied_weights_keys = ["bbox_embed", "class_embed"] # We can't initialize the model on meta device as some weights are modified during the initialization _no_split_modules = None def __init__(self, config: DFineConfig): super().__init__(config) # D-FINE encoder-decoder model self.eval_idx = config.eval_idx if config.eval_idx >= 0 else config.decoder_layers + config.eval_idx self.model = DFineModel(config) scaled_dim = round(config.layer_scale * config.hidden_size) num_pred = config.decoder_layers self.class_embed = nn.ModuleList([nn.Linear(config.d_model, config.num_labels) for _ in range(num_pred)]) self.bbox_embed = nn.ModuleList( [ DFineMLP(config.hidden_size, config.hidden_size, 4 * (config.max_num_bins + 1), 3) for _ in range(self.eval_idx + 1) ] + [ DFineMLP(scaled_dim, scaled_dim, 4 * (config.max_num_bins + 1), 3) for _ in range(config.decoder_layers - self.eval_idx - 1) ] ) # here self.model.decoder.bbox_embed is null, but not self.bbox_embed self.model.decoder.class_embed = self.class_embed self.model.decoder.bbox_embed = self.bbox_embed # Initialize weights and apply final processing self.post_init() @torch.jit.unused def _set_aux_loss(self, outputs_class, outputs_coord): # this is a workaround to make torchscript happy, as torchscript # doesn't support dictionary with non-homogeneous values, such # as a dict having both a Tensor and a list. return [{"logits": a, "pred_boxes": b} for a, b in zip(outputs_class, outputs_coord)] @auto_docstring def forward( self, pixel_values: torch.FloatTensor, pixel_mask: Optional[torch.LongTensor] = None, encoder_outputs: Optional[torch.FloatTensor] = None, inputs_embeds: Optional[torch.FloatTensor] = None, decoder_inputs_embeds: Optional[torch.FloatTensor] = None, labels: Optional[list[dict]] = None, output_attentions: Optional[bool] = None, output_hidden_states: Optional[bool] = None, return_dict: Optional[bool] = None, **kwargs, ) -> Union[tuple[torch.FloatTensor], DFineObjectDetectionOutput]: r""" Example: ```python >>> import torch >>> from transformers.image_utils import load_image >>> from transformers import AutoImageProcessor, DFineForObjectDetection >>> url = "http://images.cocodataset.org/val2017/000000039769.jpg" >>> image = load_image(url) >>> image_processor = AutoImageProcessor.from_pretrained("ustc-community/dfine-xlarge-coco") >>> model = DFineForObjectDetection.from_pretrained("ustc-community/dfine-xlarge-coco") >>> # prepare image for the model >>> inputs = image_processor(images=image, return_tensors="pt") >>> # forward pass >>> outputs = model(**inputs) >>> logits = outputs.logits >>> list(logits.shape) [1, 300, 80] >>> boxes = outputs.pred_boxes >>> list(boxes.shape) [1, 300, 4] >>> # convert outputs (bounding boxes and class logits) to Pascal VOC format (xmin, ymin, xmax, ymax) >>> target_sizes = torch.tensor([image.size[::-1]]) >>> results = image_processor.post_process_object_detection(outputs, threshold=0.9, target_sizes=target_sizes) >>> result = results[0] # first image in batch >>> for score, label, box in zip(result["scores"], result["labels"], result["boxes"]): ... box = [round(i, 2) for i in box.tolist()] ... print( ... f"Detected {model.config.id2label[label.item()]} with confidence " ... f"{round(score.item(), 3)} at location {box}" ... ) Detected cat with confidence 0.958 at location [344.49, 23.4, 639.84, 374.27] Detected cat with confidence 0.956 at location [11.71, 53.52, 316.64, 472.33] Detected remote with confidence 0.947 at location [40.46, 73.7, 175.62, 117.57] Detected sofa with confidence 0.918 at location [0.59, 1.88, 640.25, 474.74] ``` """ output_attentions = output_attentions if output_attentions is not None else self.config.output_attentions output_hidden_states = ( output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states ) return_dict = return_dict if return_dict is not None else self.config.use_return_dict outputs = self.model( pixel_values, pixel_mask=pixel_mask, encoder_outputs=encoder_outputs, inputs_embeds=inputs_embeds, decoder_inputs_embeds=decoder_inputs_embeds, labels=labels, output_attentions=output_attentions, output_hidden_states=output_hidden_states, return_dict=return_dict, ) denoising_meta_values = ( outputs.denoising_meta_values if return_dict else outputs[-1] if self.training else None ) outputs_class = outputs.intermediate_logits if return_dict else outputs[2] outputs_coord = outputs.intermediate_reference_points if return_dict else outputs[3] predicted_corners = outputs.intermediate_predicted_corners if return_dict else outputs[4] initial_reference_points = outputs.initial_reference_points if return_dict else outputs[5] logits = outputs_class[:, -1] pred_boxes = outputs_coord[:, -1] loss, loss_dict, auxiliary_outputs, enc_topk_logits, enc_topk_bboxes = None, None, None, None, None if labels is not None: enc_topk_logits = outputs.enc_topk_logits if return_dict else outputs[-5] enc_topk_bboxes = outputs.enc_topk_bboxes if return_dict else outputs[-4] loss, loss_dict, auxiliary_outputs = self.loss_function( logits, labels, self.device, pred_boxes, self.config, outputs_class, outputs_coord, enc_topk_logits=enc_topk_logits, enc_topk_bboxes=enc_topk_bboxes, denoising_meta_values=denoising_meta_values, predicted_corners=predicted_corners, initial_reference_points=initial_reference_points, **kwargs, ) if not return_dict: if auxiliary_outputs is not None: output = (logits, pred_boxes) + (auxiliary_outputs,) + outputs else: output = (logits, pred_boxes) + outputs return ((loss, loss_dict) + output) if loss is not None else output return DFineObjectDetectionOutput( loss=loss, loss_dict=loss_dict, logits=logits, pred_boxes=pred_boxes, auxiliary_outputs=auxiliary_outputs, last_hidden_state=outputs.last_hidden_state, intermediate_hidden_states=outputs.intermediate_hidden_states, intermediate_logits=outputs.intermediate_logits, intermediate_reference_points=outputs.intermediate_reference_points, intermediate_predicted_corners=outputs.intermediate_predicted_corners, initial_reference_points=outputs.initial_reference_points, decoder_hidden_states=outputs.decoder_hidden_states, decoder_attentions=outputs.decoder_attentions, cross_attentions=outputs.cross_attentions, encoder_last_hidden_state=outputs.encoder_last_hidden_state, encoder_hidden_states=outputs.encoder_hidden_states, encoder_attentions=outputs.encoder_attentions, init_reference_points=outputs.init_reference_points, enc_topk_logits=outputs.enc_topk_logits, enc_topk_bboxes=outputs.enc_topk_bboxes, enc_outputs_class=outputs.enc_outputs_class, enc_outputs_coord_logits=outputs.enc_outputs_coord_logits, denoising_meta_values=outputs.denoising_meta_values, ) # taken from https://github.com/facebookresearch/detr/blob/master/models/detr.py class DFineMLPPredictionHead(nn.Module): """ Very simple multi-layer perceptron (MLP, also called FFN), used to predict the normalized center coordinates, height and width of a bounding box w.r.t. an image. Copied from https://github.com/facebookresearch/detr/blob/master/models/detr.py Origin from https://github.com/lyuwenyu/RT-DETR/blob/94f5e16708329d2f2716426868ec89aa774af016/DFine_paddle/ppdet/modeling/transformers/utils.py#L453 """ def __init__(self, config, input_dim, d_model, output_dim, num_layers): super().__init__() self.num_layers = num_layers h = [d_model] * (num_layers - 1) self.layers = nn.ModuleList(nn.Linear(n, k) for n, k in zip([input_dim] + h, h + [output_dim])) def forward(self, x): for i, layer in enumerate(self.layers): x = nn.functional.relu(layer(x)) if i < self.num_layers - 1 else layer(x) return x class DFineMLP(nn.Module): def __init__(self, input_dim: int, hidden_dim: int, output_dim: int, num_layers: int, act: str = "relu"): super().__init__() self.num_layers = num_layers hidden_dims = [hidden_dim] * (num_layers - 1) input_dims = [input_dim] + hidden_dims output_dims = hidden_dims + [output_dim] self.layers = nn.ModuleList(nn.Linear(in_dim, out_dim) for in_dim, out_dim in zip(input_dims, output_dims)) self.act = ACT2CLS[act]() def forward(self, stat_features: torch.Tensor) -> torch.Tensor: for i, layer in enumerate(self.layers): stat_features = self.act(layer(stat_features)) if i < self.num_layers - 1 else layer(stat_features) return stat_features class DFineLQE(nn.Module): def __init__(self, config: DFineConfig): super().__init__() self.top_prob_values = config.top_prob_values self.max_num_bins = config.max_num_bins self.reg_conf = DFineMLP(4 * (self.top_prob_values + 1), config.lqe_hidden_dim, 1, config.lqe_layers) def forward(self, scores: torch.Tensor, pred_corners: torch.Tensor) -> torch.Tensor: batch_size, length, _ = pred_corners.size() prob = F.softmax(pred_corners.reshape(batch_size, length, 4, self.max_num_bins + 1), dim=-1) prob_topk, _ = prob.topk(self.top_prob_values, dim=-1) stat = torch.cat([prob_topk, prob_topk.mean(dim=-1, keepdim=True)], dim=-1) quality_score = self.reg_conf(stat.reshape(batch_size, length, -1)) scores = scores + quality_score return scores class DFineConvNormLayer(nn.Module): def __init__( self, config: DFineConfig, in_channels: int, out_channels: int, kernel_size: int, stride: int, groups: int = 1, padding: Optional[int] = None, activation: Optional[str] = None, ): super().__init__() self.conv = nn.Conv2d( in_channels, out_channels, kernel_size, stride, groups=groups, padding=(kernel_size - 1) // 2 if padding is None else padding, bias=False, ) self.norm = nn.BatchNorm2d(out_channels, config.batch_norm_eps) self.activation = nn.Identity() if activation is None else ACT2CLS[activation]() def forward(self, hidden_state): hidden_state = self.conv(hidden_state) hidden_state = self.norm(hidden_state) hidden_state = self.activation(hidden_state) return hidden_state class DFineRepVggBlock(nn.Module): """ RepVGG architecture block introduced by the work "RepVGG: Making VGG-style ConvNets Great Again". """ def __init__(self, config: DFineConfig, in_channels: int, out_channels: int): super().__init__() activation = config.activation_function hidden_channels = in_channels self.conv1 = DFineConvNormLayer(config, hidden_channels, out_channels, 3, 1, padding=1) self.conv2 = DFineConvNormLayer(config, hidden_channels, out_channels, 1, 1, padding=0) self.activation = nn.Identity() if activation is None else ACT2CLS[activation]() def forward(self, x): y = self.conv1(x) + self.conv2(x) return self.activation(y) class DFineCSPRepLayer(nn.Module): """ Cross Stage Partial (CSP) network layer with RepVGG blocks. """ def __init__( self, config: DFineConfig, in_channels: int, out_channels: int, num_blocks: int, expansion: float = 1.0 ): super().__init__() activation = config.activation_function hidden_channels = int(out_channels * expansion) self.conv1 = DFineConvNormLayer(config, in_channels, hidden_channels, 1, 1, activation=activation) self.conv2 = DFineConvNormLayer(config, in_channels, hidden_channels, 1, 1, activation=activation) self.bottlenecks = nn.ModuleList( [DFineRepVggBlock(config, hidden_channels, hidden_channels) for _ in range(num_blocks)] ) if hidden_channels != out_channels: self.conv3 = DFineConvNormLayer(config, hidden_channels, out_channels, 1, 1, activation=activation) else: self.conv3 = nn.Identity() def forward(self, hidden_state: torch.Tensor) -> torch.Tensor: hidden_state_1 = self.conv1(hidden_state) for bottleneck in self.bottlenecks: hidden_state_1 = bottleneck(hidden_state_1) hidden_state_2 = self.conv2(hidden_state) hidden_state_3 = self.conv3(hidden_state_1 + hidden_state_2) return hidden_state_3 class DFineRepNCSPELAN4(nn.Module): def __init__(self, config: DFineConfig, act: str = "silu", numb_blocks: int = 3): super().__init__() conv1_dim = config.encoder_hidden_dim * 2 conv2_dim = config.encoder_hidden_dim conv3_dim = config.encoder_hidden_dim * 2 conv4_dim = round(config.hidden_expansion * config.encoder_hidden_dim // 2) self.conv_dim = conv3_dim // 2 self.conv1 = DFineConvNormLayer(config, conv1_dim, conv3_dim, 1, 1, activation=act) self.csp_rep1 = DFineCSPRepLayer(config, conv3_dim // 2, conv4_dim, num_blocks=numb_blocks) self.conv2 = DFineConvNormLayer(config, conv4_dim, conv4_dim, 3, 1, activation=act) self.csp_rep2 = DFineCSPRepLayer(config, conv4_dim, conv4_dim, num_blocks=numb_blocks) self.conv3 = DFineConvNormLayer(config, conv4_dim, conv4_dim, 3, 1, activation=act) self.conv4 = DFineConvNormLayer(config, conv3_dim + (2 * conv4_dim), conv2_dim, 1, 1, activation=act) def forward(self, input_features: torch.Tensor) -> torch.Tensor: # Split initial features into two branches after first convolution split_features = list(self.conv1(input_features).split((self.conv_dim, self.conv_dim), 1)) # Process branches sequentially branch1 = self.csp_rep1(split_features[-1]) branch1 = self.conv2(branch1) branch2 = self.csp_rep2(branch1) branch2 = self.conv3(branch2) split_features.extend([branch1, branch2]) merged_features = torch.cat(split_features, 1) merged_features = self.conv4(merged_features) return merged_features class DFineSCDown(nn.Module): def __init__(self, config: DFineConfig, kernel_size: int, stride: int): super().__init__() self.conv1 = DFineConvNormLayer(config, config.encoder_hidden_dim, config.encoder_hidden_dim, 1, 1) self.conv2 = DFineConvNormLayer( config, config.encoder_hidden_dim, config.encoder_hidden_dim, kernel_size, stride, config.encoder_hidden_dim, ) def forward(self, input_features: torch.Tensor) -> torch.Tensor: input_features = self.conv1(input_features) input_features = self.conv2(input_features) return input_features class DFineEncoderLayer(nn.Module): def __init__(self, config: DFineConfig): super().__init__() self.normalize_before = config.normalize_before # self-attention self.self_attn = DFineMultiheadAttention( embed_dim=config.encoder_hidden_dim, num_heads=config.num_attention_heads, dropout=config.dropout, ) self.self_attn_layer_norm = nn.LayerNorm(config.encoder_hidden_dim, eps=config.layer_norm_eps) self.dropout = config.dropout self.activation_fn = ACT2FN[config.encoder_activation_function] self.activation_dropout = config.activation_dropout self.fc1 = nn.Linear(config.encoder_hidden_dim, config.encoder_ffn_dim) self.fc2 = nn.Linear(config.encoder_ffn_dim, config.encoder_hidden_dim) self.final_layer_norm = nn.LayerNorm(config.encoder_hidden_dim, eps=config.layer_norm_eps) def forward( self, hidden_states: torch.Tensor, attention_mask: torch.Tensor, position_embeddings: Optional[torch.Tensor] = None, output_attentions: bool = False, **kwargs, ): """ Args: hidden_states (`torch.FloatTensor`): input to the layer of shape `(batch, seq_len, embed_dim)` attention_mask (`torch.FloatTensor`): attention mask of size `(batch, 1, target_len, source_len)` where padding elements are indicated by very large negative values. position_embeddings (`torch.FloatTensor`, *optional*): Object queries (also called content embeddings), to be added to the hidden states. output_attentions (`bool`, *optional*): Whether or not to return the attentions tensors of all attention layers. See `attentions` under returned tensors for more detail. """ residual = hidden_states if self.normalize_before: hidden_states = self.self_attn_layer_norm(hidden_states) hidden_states, attn_weights = self.self_attn( hidden_states=hidden_states, attention_mask=attention_mask, position_embeddings=position_embeddings, output_attentions=output_attentions, ) hidden_states = nn.functional.dropout(hidden_states, p=self.dropout, training=self.training) hidden_states = residual + hidden_states if not self.normalize_before: hidden_states = self.self_attn_layer_norm(hidden_states) if self.normalize_before: hidden_states = self.final_layer_norm(hidden_states) residual = hidden_states hidden_states = self.activation_fn(self.fc1(hidden_states)) hidden_states = nn.functional.dropout(hidden_states, p=self.activation_dropout, training=self.training) hidden_states = self.fc2(hidden_states) hidden_states = nn.functional.dropout(hidden_states, p=self.dropout, training=self.training) hidden_states = residual + hidden_states if not self.normalize_before: hidden_states = self.final_layer_norm(hidden_states) if self.training: if torch.isinf(hidden_states).any() or torch.isnan(hidden_states).any(): clamp_value = torch.finfo(hidden_states.dtype).max - 1000 hidden_states = torch.clamp(hidden_states, min=-clamp_value, max=clamp_value) outputs = (hidden_states,) if output_attentions: outputs += (attn_weights,) return outputs class DFineEncoder(nn.Module): def __init__(self, config: DFineConfig): super().__init__() self.layers = nn.ModuleList([DFineEncoderLayer(config) for _ in range(config.encoder_layers)]) def forward(self, src, src_mask=None, pos_embed=None, output_attentions: bool = False) -> torch.Tensor: hidden_states = src for layer in self.layers: hidden_states = layer( hidden_states, attention_mask=src_mask, position_embeddings=pos_embed, output_attentions=output_attentions, ) return hidden_states class DFineHybridEncoder(nn.Module): """ Decoder consisting of a projection layer, a set of `DFineEncoder`, a top-down Feature Pyramid Network (FPN) and a bottom-up Path Aggregation Network (PAN). More details on the paper: https://huggingface.co/papers/2304.08069 Args: config: DFineConfig """ def __init__(self, config: DFineConfig): super().__init__() self.config = config self.in_channels = config.encoder_in_channels self.num_fpn_stages = len(self.in_channels) - 1 self.feat_strides = config.feat_strides self.encoder_hidden_dim = config.encoder_hidden_dim self.encode_proj_layers = config.encode_proj_layers self.positional_encoding_temperature = config.positional_encoding_temperature self.eval_size = config.eval_size self.out_channels = [self.encoder_hidden_dim for _ in self.in_channels] self.out_strides = self.feat_strides # encoder transformer self.encoder = nn.ModuleList([DFineEncoder(config) for _ in range(len(self.encode_proj_layers))]) # top-down fpn self.lateral_convs = nn.ModuleList() self.fpn_blocks = nn.ModuleList() for _ in range(len(self.in_channels) - 1, 0, -1): lateral_layer = DFineConvNormLayer(config, self.encoder_hidden_dim, self.encoder_hidden_dim, 1, 1) self.lateral_convs.append(lateral_layer) num_blocks = round(3 * config.depth_mult) fpn_layer = DFineRepNCSPELAN4(config, numb_blocks=num_blocks) self.fpn_blocks.append(fpn_layer) # bottom-up pan self.downsample_convs = nn.ModuleList() self.pan_blocks = nn.ModuleList() for _ in range(len(self.in_channels) - 1): self.downsample_convs.append(DFineSCDown(config, 3, 2)) num_blocks = round(3 * config.depth_mult) self.pan_blocks.append(DFineRepNCSPELAN4(config, numb_blocks=num_blocks)) @staticmethod def build_2d_sincos_position_embedding( width, height, embed_dim=256, temperature=10000.0, device="cpu", dtype=torch.float32 ): grid_w = torch.arange(torch_int(width), device=device).to(dtype) grid_h = torch.arange(torch_int(height), device=device).to(dtype) grid_w, grid_h = torch.meshgrid(grid_w, grid_h, indexing="ij") if embed_dim % 4 != 0: raise ValueError("Embed dimension must be divisible by 4 for 2D sin-cos position embedding") pos_dim = embed_dim // 4 omega = torch.arange(pos_dim, device=device).to(dtype) / pos_dim omega = 1.0 / (temperature**omega) out_w = grid_w.flatten()[..., None] @ omega[None] out_h = grid_h.flatten()[..., None] @ omega[None] return torch.concat([out_w.sin(), out_w.cos(), out_h.sin(), out_h.cos()], dim=1)[None, :, :] def forward( self, inputs_embeds=None, attention_mask=None, position_embeddings=None, spatial_shapes=None, level_start_index=None, valid_ratios=None, output_attentions=None, output_hidden_states=None, return_dict=None, ): r""" Args: inputs_embeds (`torch.FloatTensor` of shape `(batch_size, sequence_length, hidden_size)`): Flattened feature map (output of the backbone + projection layer) that is passed to the encoder. attention_mask (`torch.Tensor` of shape `(batch_size, sequence_length)`, *optional*): Mask to avoid performing attention on padding pixel features. Mask values selected in `[0, 1]`: - 1 for pixel features that are real (i.e. **not masked**), - 0 for pixel features that are padding (i.e. **masked**). [What are attention masks?](../glossary#attention-mask) position_embeddings (`torch.FloatTensor` of shape `(batch_size, sequence_length, hidden_size)`): Position embeddings that are added to the queries and keys in each self-attention layer. spatial_shapes (`torch.LongTensor` of shape `(num_feature_levels, 2)`): Spatial shapes of each feature map. level_start_index (`torch.LongTensor` of shape `(num_feature_levels)`): Starting index of each feature map. valid_ratios (`torch.FloatTensor` of shape `(batch_size, num_feature_levels, 2)`): Ratio of valid area in each feature level. output_attentions (`bool`, *optional*): Whether or not to return the attentions tensors of all attention layers. See `attentions` under returned tensors for more detail. output_hidden_states (`bool`, *optional*): Whether or not to return the hidden states of all layers. See `hidden_states` under returned tensors for more detail. return_dict (`bool`, *optional*): Whether or not to return a [`~file_utils.ModelOutput`] instead of a plain tuple. """ output_attentions = output_attentions if output_attentions is not None else self.config.output_attentions output_hidden_states = ( output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states ) return_dict = return_dict if return_dict is not None else self.config.use_return_dict hidden_states = inputs_embeds encoder_states = () if output_hidden_states else None all_attentions = () if output_attentions else None # encoder if self.config.encoder_layers > 0: for i, enc_ind in enumerate(self.encode_proj_layers): if output_hidden_states: encoder_states = encoder_states + (hidden_states[enc_ind],) height, width = hidden_states[enc_ind].shape[2:] # flatten [batch, channel, height, width] to [batch, height*width, channel] src_flatten = hidden_states[enc_ind].flatten(2).permute(0, 2, 1) if self.training or self.eval_size is None: pos_embed = self.build_2d_sincos_position_embedding( width, height, self.encoder_hidden_dim, self.positional_encoding_temperature, device=src_flatten.device, dtype=src_flatten.dtype, ) else: pos_embed = None layer_outputs = self.encoder[i]( src_flatten, pos_embed=pos_embed, output_attentions=output_attentions, ) hidden_states[enc_ind] = ( layer_outputs[0].permute(0, 2, 1).reshape(-1, self.encoder_hidden_dim, height, width).contiguous() ) if output_attentions: all_attentions = all_attentions + (layer_outputs[1],) if output_hidden_states: encoder_states = encoder_states + (hidden_states[enc_ind],) # top-down FPN fpn_feature_maps = [hidden_states[-1]] for idx, (lateral_conv, fpn_block) in enumerate(zip(self.lateral_convs, self.fpn_blocks)): backbone_feature_map = hidden_states[self.num_fpn_stages - idx - 1] top_fpn_feature_map = fpn_feature_maps[-1] # apply lateral block top_fpn_feature_map = lateral_conv(top_fpn_feature_map) fpn_feature_maps[-1] = top_fpn_feature_map # apply fpn block top_fpn_feature_map = F.interpolate(top_fpn_feature_map, scale_factor=2.0, mode="nearest") fused_feature_map = torch.concat([top_fpn_feature_map, backbone_feature_map], dim=1) new_fpn_feature_map = fpn_block(fused_feature_map) fpn_feature_maps.append(new_fpn_feature_map) fpn_feature_maps = fpn_feature_maps[::-1] # bottom-up PAN pan_feature_maps = [fpn_feature_maps[0]] for idx, (downsample_conv, pan_block) in enumerate(zip(self.downsample_convs, self.pan_blocks)): top_pan_feature_map = pan_feature_maps[-1] fpn_feature_map = fpn_feature_maps[idx + 1] downsampled_feature_map = downsample_conv(top_pan_feature_map) fused_feature_map = torch.concat([downsampled_feature_map, fpn_feature_map], dim=1) new_pan_feature_map = pan_block(fused_feature_map) pan_feature_maps.append(new_pan_feature_map) if not return_dict: return tuple(v for v in [pan_feature_maps, encoder_states, all_attentions] if v is not None) return BaseModelOutput( last_hidden_state=pan_feature_maps, hidden_states=encoder_states, attentions=all_attentions ) __all__ = ["DFineModel", "DFinePreTrainedModel", "DFineForObjectDetection"]