# coding=utf-8 # Copyright 2024 Baidu Inc and The HuggingFace Inc. team. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """PyTorch RT-DETR model.""" import math import warnings from dataclasses import dataclass from functools import partial from typing import Optional, Union import torch import torch.nn.functional as F from torch import Tensor, nn from ...activations import ACT2CLS, ACT2FN from ...image_transforms import center_to_corners_format, corners_to_center_format from ...integrations import use_kernel_forward_from_hub from ...modeling_outputs import BaseModelOutput from ...modeling_utils import PreTrainedModel from ...pytorch_utils import compile_compatible_method_lru_cache from ...utils import ( ModelOutput, auto_docstring, logging, torch_int, ) from ...utils.backbone_utils import load_backbone from .configuration_rt_detr import RTDetrConfig logger = logging.get_logger(__name__) # TODO: Replace all occurrences of the checkpoint with the final one @use_kernel_forward_from_hub("MultiScaleDeformableAttention") # Copied from transformers.models.deformable_detr.modeling_deformable_detr.MultiScaleDeformableAttention class MultiScaleDeformableAttention(nn.Module): def forward( self, value: Tensor, value_spatial_shapes: Tensor, value_spatial_shapes_list: list[tuple], level_start_index: Tensor, sampling_locations: Tensor, attention_weights: Tensor, im2col_step: int, ): batch_size, _, num_heads, hidden_dim = value.shape _, num_queries, num_heads, num_levels, num_points, _ = sampling_locations.shape value_list = value.split([height * width for height, width in value_spatial_shapes_list], dim=1) sampling_grids = 2 * sampling_locations - 1 sampling_value_list = [] for level_id, (height, width) in enumerate(value_spatial_shapes_list): # batch_size, height*width, num_heads, hidden_dim # -> batch_size, height*width, num_heads*hidden_dim # -> batch_size, num_heads*hidden_dim, height*width # -> batch_size*num_heads, hidden_dim, height, width value_l_ = ( value_list[level_id] .flatten(2) .transpose(1, 2) .reshape(batch_size * num_heads, hidden_dim, height, width) ) # batch_size, num_queries, num_heads, num_points, 2 # -> batch_size, num_heads, num_queries, num_points, 2 # -> batch_size*num_heads, num_queries, num_points, 2 sampling_grid_l_ = sampling_grids[:, :, :, level_id].transpose(1, 2).flatten(0, 1) # batch_size*num_heads, hidden_dim, num_queries, num_points sampling_value_l_ = nn.functional.grid_sample( value_l_, sampling_grid_l_, mode="bilinear", padding_mode="zeros", align_corners=False, ) sampling_value_list.append(sampling_value_l_) # (batch_size, num_queries, num_heads, num_levels, num_points) # -> (batch_size, num_heads, num_queries, num_levels, num_points) # -> (batch_size, num_heads, 1, num_queries, num_levels*num_points) attention_weights = attention_weights.transpose(1, 2).reshape( batch_size * num_heads, 1, num_queries, num_levels * num_points ) output = ( (torch.stack(sampling_value_list, dim=-2).flatten(-2) * attention_weights) .sum(-1) .view(batch_size, num_heads * hidden_dim, num_queries) ) return output.transpose(1, 2).contiguous() @dataclass @auto_docstring( custom_intro=""" Base class for outputs of the RTDetrDecoder. This class adds two attributes to BaseModelOutputWithCrossAttentions, namely: - a stacked tensor of intermediate decoder hidden states (i.e. the output of each decoder layer) - a stacked tensor of intermediate reference points. """ ) class RTDetrDecoderOutput(ModelOutput): r""" intermediate_hidden_states (`torch.FloatTensor` of shape `(batch_size, config.decoder_layers, num_queries, hidden_size)`): Stacked intermediate hidden states (output of each layer of the decoder). intermediate_logits (`torch.FloatTensor` of shape `(batch_size, config.decoder_layers, sequence_length, config.num_labels)`): Stacked intermediate logits (logits of each layer of the decoder). intermediate_reference_points (`torch.FloatTensor` of shape `(batch_size, config.decoder_layers, sequence_length, hidden_size)`): Stacked intermediate reference points (reference points of each layer of the decoder). intermediate_predicted_corners (`torch.FloatTensor` of shape `(batch_size, config.decoder_layers, num_queries, 4)`): Stacked intermediate predicted corners (predicted corners of each layer of the decoder). initial_reference_points (`torch.FloatTensor` of shape `(batch_size, config.decoder_layers, num_queries, 4)`): Stacked initial reference points (initial reference points of each layer of the decoder). cross_attentions (`tuple(torch.FloatTensor)`, *optional*, returned when `output_attentions=True` and `config.add_cross_attention=True` is passed or when `config.output_attentions=True`): Tuple of `torch.FloatTensor` (one for each layer) of shape `(batch_size, num_heads, sequence_length, sequence_length)`. Attentions weights of the decoder's cross-attention layer, after the attention softmax, used to compute the weighted average in the cross-attention heads. """ last_hidden_state: Optional[torch.FloatTensor] = None intermediate_hidden_states: Optional[torch.FloatTensor] = None intermediate_logits: Optional[torch.FloatTensor] = None intermediate_reference_points: Optional[torch.FloatTensor] = None intermediate_predicted_corners: Optional[torch.FloatTensor] = None initial_reference_points: Optional[torch.FloatTensor] = None hidden_states: Optional[tuple[torch.FloatTensor]] = None attentions: Optional[tuple[torch.FloatTensor]] = None cross_attentions: Optional[tuple[torch.FloatTensor]] = None @dataclass @auto_docstring( custom_intro=""" Base class for outputs of the RT-DETR encoder-decoder model. """ ) class RTDetrModelOutput(ModelOutput): r""" last_hidden_state (`torch.FloatTensor` of shape `(batch_size, num_queries, hidden_size)`): Sequence of hidden-states at the output of the last layer of the decoder of the model. intermediate_hidden_states (`torch.FloatTensor` of shape `(batch_size, config.decoder_layers, num_queries, hidden_size)`): Stacked intermediate hidden states (output of each layer of the decoder). intermediate_logits (`torch.FloatTensor` of shape `(batch_size, config.decoder_layers, sequence_length, config.num_labels)`): Stacked intermediate logits (logits of each layer of the decoder). intermediate_reference_points (`torch.FloatTensor` of shape `(batch_size, config.decoder_layers, num_queries, 4)`): Stacked intermediate reference points (reference points of each layer of the decoder). intermediate_predicted_corners (`torch.FloatTensor` of shape `(batch_size, config.decoder_layers, num_queries, 4)`): Stacked intermediate predicted corners (predicted corners of each layer of the decoder). initial_reference_points (`torch.FloatTensor` of shape `(batch_size, num_queries, 4)`): Initial reference points used for the first decoder layer. init_reference_points (`torch.FloatTensor` of shape `(batch_size, num_queries, 4)`): Initial reference points sent through the Transformer decoder. enc_topk_logits (`torch.FloatTensor` of shape `(batch_size, sequence_length, config.num_labels)`): Predicted bounding boxes scores where the top `config.two_stage_num_proposals` scoring bounding boxes are picked as region proposals in the encoder stage. Output of bounding box binary classification (i.e. foreground and background). enc_topk_bboxes (`torch.FloatTensor` of shape `(batch_size, sequence_length, 4)`): Logits of predicted bounding boxes coordinates in the encoder stage. enc_outputs_class (`torch.FloatTensor` of shape `(batch_size, sequence_length, config.num_labels)`, *optional*, returned when `config.with_box_refine=True` and `config.two_stage=True`): Predicted bounding boxes scores where the top `config.two_stage_num_proposals` scoring bounding boxes are picked as region proposals in the first stage. Output of bounding box binary classification (i.e. foreground and background). enc_outputs_coord_logits (`torch.FloatTensor` of shape `(batch_size, sequence_length, 4)`, *optional*, returned when `config.with_box_refine=True` and `config.two_stage=True`): Logits of predicted bounding boxes coordinates in the first stage. denoising_meta_values (`dict`): Extra dictionary for the denoising related values. """ last_hidden_state: Optional[torch.FloatTensor] = None intermediate_hidden_states: Optional[torch.FloatTensor] = None intermediate_logits: Optional[torch.FloatTensor] = None intermediate_reference_points: Optional[torch.FloatTensor] = None intermediate_predicted_corners: Optional[torch.FloatTensor] = None initial_reference_points: Optional[torch.FloatTensor] = None decoder_hidden_states: Optional[tuple[torch.FloatTensor]] = None decoder_attentions: Optional[tuple[torch.FloatTensor]] = None cross_attentions: Optional[tuple[torch.FloatTensor]] = None encoder_last_hidden_state: Optional[torch.FloatTensor] = None encoder_hidden_states: Optional[tuple[torch.FloatTensor]] = None encoder_attentions: Optional[tuple[torch.FloatTensor]] = None init_reference_points: Optional[torch.FloatTensor] = None enc_topk_logits: Optional[torch.FloatTensor] = None enc_topk_bboxes: Optional[torch.FloatTensor] = None enc_outputs_class: Optional[torch.FloatTensor] = None enc_outputs_coord_logits: Optional[torch.FloatTensor] = None denoising_meta_values: Optional[dict] = None @dataclass @auto_docstring( custom_intro=""" Output type of [`RTDetrForObjectDetection`]. """ ) class RTDetrObjectDetectionOutput(ModelOutput): r""" loss (`torch.FloatTensor` of shape `(1,)`, *optional*, returned when `labels` are provided)): Total loss as a linear combination of a negative log-likehood (cross-entropy) for class prediction and a bounding box loss. The latter is defined as a linear combination of the L1 loss and the generalized scale-invariant IoU loss. loss_dict (`Dict`, *optional*): A dictionary containing the individual losses. Useful for logging. logits (`torch.FloatTensor` of shape `(batch_size, num_queries, num_classes + 1)`): Classification logits (including no-object) for all queries. pred_boxes (`torch.FloatTensor` of shape `(batch_size, num_queries, 4)`): Normalized boxes coordinates for all queries, represented as (center_x, center_y, width, height). These values are normalized in [0, 1], relative to the size of each individual image in the batch (disregarding possible padding). You can use [`~RTDetrImageProcessor.post_process_object_detection`] to retrieve the unnormalized (absolute) bounding boxes. auxiliary_outputs (`list[Dict]`, *optional*): Optional, only returned when auxiliary losses are activated (i.e. `config.auxiliary_loss` is set to `True`) and labels are provided. It is a list of dictionaries containing the two above keys (`logits` and `pred_boxes`) for each decoder layer. last_hidden_state (`torch.FloatTensor` of shape `(batch_size, num_queries, hidden_size)`): Sequence of hidden-states at the output of the last layer of the decoder of the model. intermediate_hidden_states (`torch.FloatTensor` of shape `(batch_size, config.decoder_layers, num_queries, hidden_size)`): Stacked intermediate hidden states (output of each layer of the decoder). intermediate_logits (`torch.FloatTensor` of shape `(batch_size, config.decoder_layers, num_queries, config.num_labels)`): Stacked intermediate logits (logits of each layer of the decoder). intermediate_reference_points (`torch.FloatTensor` of shape `(batch_size, config.decoder_layers, num_queries, 4)`): Stacked intermediate reference points (reference points of each layer of the decoder). intermediate_predicted_corners (`torch.FloatTensor` of shape `(batch_size, config.decoder_layers, num_queries, 4)`): Stacked intermediate predicted corners (predicted corners of each layer of the decoder). initial_reference_points (`torch.FloatTensor` of shape `(batch_size, config.decoder_layers, num_queries, 4)`): Stacked initial reference points (initial reference points of each layer of the decoder). init_reference_points (`torch.FloatTensor` of shape `(batch_size, num_queries, 4)`): Initial reference points sent through the Transformer decoder. enc_topk_logits (`torch.FloatTensor` of shape `(batch_size, sequence_length, config.num_labels)`, *optional*, returned when `config.with_box_refine=True` and `config.two_stage=True`): Logits of predicted bounding boxes coordinates in the encoder. enc_topk_bboxes (`torch.FloatTensor` of shape `(batch_size, sequence_length, 4)`, *optional*, returned when `config.with_box_refine=True` and `config.two_stage=True`): Logits of predicted bounding boxes coordinates in the encoder. enc_outputs_class (`torch.FloatTensor` of shape `(batch_size, sequence_length, config.num_labels)`, *optional*, returned when `config.with_box_refine=True` and `config.two_stage=True`): Predicted bounding boxes scores where the top `config.two_stage_num_proposals` scoring bounding boxes are picked as region proposals in the first stage. Output of bounding box binary classification (i.e. foreground and background). enc_outputs_coord_logits (`torch.FloatTensor` of shape `(batch_size, sequence_length, 4)`, *optional*, returned when `config.with_box_refine=True` and `config.two_stage=True`): Logits of predicted bounding boxes coordinates in the first stage. denoising_meta_values (`dict`): Extra dictionary for the denoising related values """ loss: Optional[torch.FloatTensor] = None loss_dict: Optional[dict] = None logits: Optional[torch.FloatTensor] = None pred_boxes: Optional[torch.FloatTensor] = None auxiliary_outputs: Optional[list[dict]] = None last_hidden_state: Optional[torch.FloatTensor] = None intermediate_hidden_states: Optional[torch.FloatTensor] = None intermediate_logits: Optional[torch.FloatTensor] = None intermediate_reference_points: Optional[torch.FloatTensor] = None intermediate_predicted_corners: Optional[torch.FloatTensor] = None initial_reference_points: Optional[torch.FloatTensor] = None decoder_hidden_states: Optional[tuple[torch.FloatTensor]] = None decoder_attentions: Optional[tuple[torch.FloatTensor]] = None cross_attentions: Optional[tuple[torch.FloatTensor]] = None encoder_last_hidden_state: Optional[torch.FloatTensor] = None encoder_hidden_states: Optional[tuple[torch.FloatTensor]] = None encoder_attentions: Optional[tuple[torch.FloatTensor]] = None init_reference_points: Optional[tuple[torch.FloatTensor]] = None enc_topk_logits: Optional[torch.FloatTensor] = None enc_topk_bboxes: Optional[torch.FloatTensor] = None enc_outputs_class: Optional[torch.FloatTensor] = None enc_outputs_coord_logits: Optional[torch.FloatTensor] = None denoising_meta_values: Optional[dict] = None def _get_clones(partial_module, N): return nn.ModuleList([partial_module() for i in range(N)]) # Copied from transformers.models.conditional_detr.modeling_conditional_detr.inverse_sigmoid def inverse_sigmoid(x, eps=1e-5): x = x.clamp(min=0, max=1) x1 = x.clamp(min=eps) x2 = (1 - x).clamp(min=eps) return torch.log(x1 / x2) # Copied from transformers.models.detr.modeling_detr.DetrFrozenBatchNorm2d with Detr->RTDetr class RTDetrFrozenBatchNorm2d(nn.Module): """ BatchNorm2d where the batch statistics and the affine parameters are fixed. Copy-paste from torchvision.misc.ops with added eps before rqsrt, without which any other models than torchvision.models.resnet[18,34,50,101] produce nans. """ def __init__(self, n): super().__init__() self.register_buffer("weight", torch.ones(n)) self.register_buffer("bias", torch.zeros(n)) self.register_buffer("running_mean", torch.zeros(n)) self.register_buffer("running_var", torch.ones(n)) def _load_from_state_dict( self, state_dict, prefix, local_metadata, strict, missing_keys, unexpected_keys, error_msgs ): num_batches_tracked_key = prefix + "num_batches_tracked" if num_batches_tracked_key in state_dict: del state_dict[num_batches_tracked_key] super()._load_from_state_dict( state_dict, prefix, local_metadata, strict, missing_keys, unexpected_keys, error_msgs ) def forward(self, x): # move reshapes to the beginning # to make it user-friendly weight = self.weight.reshape(1, -1, 1, 1) bias = self.bias.reshape(1, -1, 1, 1) running_var = self.running_var.reshape(1, -1, 1, 1) running_mean = self.running_mean.reshape(1, -1, 1, 1) epsilon = 1e-5 scale = weight * (running_var + epsilon).rsqrt() bias = bias - running_mean * scale return x * scale + bias # Copied from transformers.models.detr.modeling_detr.replace_batch_norm with Detr->RTDetr def replace_batch_norm(model): r""" Recursively replace all `torch.nn.BatchNorm2d` with `RTDetrFrozenBatchNorm2d`. Args: model (torch.nn.Module): input model """ for name, module in model.named_children(): if isinstance(module, nn.BatchNorm2d): new_module = RTDetrFrozenBatchNorm2d(module.num_features) if module.weight.device != torch.device("meta"): new_module.weight.data.copy_(module.weight) new_module.bias.data.copy_(module.bias) new_module.running_mean.data.copy_(module.running_mean) new_module.running_var.data.copy_(module.running_var) model._modules[name] = new_module if len(list(module.children())) > 0: replace_batch_norm(module) def get_contrastive_denoising_training_group( targets, num_classes, num_queries, class_embed, num_denoising_queries=100, label_noise_ratio=0.5, box_noise_scale=1.0, ): """ Creates a contrastive denoising training group using ground-truth samples. It adds noise to labels and boxes. Args: targets (`list[dict]`): The target objects, each containing 'class_labels' and 'boxes' for objects in an image. num_classes (`int`): Total number of classes in the dataset. num_queries (`int`): Number of query slots in the transformer. class_embed (`callable`): A function or a model layer to embed class labels. num_denoising_queries (`int`, *optional*, defaults to 100): Number of denoising queries. label_noise_ratio (`float`, *optional*, defaults to 0.5): Ratio of noise applied to labels. box_noise_scale (`float`, *optional*, defaults to 1.0): Scale of noise applied to bounding boxes. Returns: `tuple` comprising various elements: - **input_query_class** (`torch.FloatTensor`) -- Class queries with applied label noise. - **input_query_bbox** (`torch.FloatTensor`) -- Bounding box queries with applied box noise. - **attn_mask** (`torch.FloatTensor`) -- Attention mask for separating denoising and reconstruction queries. - **denoising_meta_values** (`dict`) -- Metadata including denoising positive indices, number of groups, and split sizes. """ if num_denoising_queries <= 0: return None, None, None, None num_ground_truths = [len(t["class_labels"]) for t in targets] device = targets[0]["class_labels"].device max_gt_num = max(num_ground_truths) if max_gt_num == 0: return None, None, None, None num_groups_denoising_queries = num_denoising_queries // max_gt_num num_groups_denoising_queries = 1 if num_groups_denoising_queries == 0 else num_groups_denoising_queries # pad gt to max_num of a batch batch_size = len(num_ground_truths) input_query_class = torch.full([batch_size, max_gt_num], num_classes, dtype=torch.int32, device=device) input_query_bbox = torch.zeros([batch_size, max_gt_num, 4], device=device) pad_gt_mask = torch.zeros([batch_size, max_gt_num], dtype=torch.bool, device=device) for i in range(batch_size): num_gt = num_ground_truths[i] if num_gt > 0: input_query_class[i, :num_gt] = targets[i]["class_labels"] input_query_bbox[i, :num_gt] = targets[i]["boxes"] pad_gt_mask[i, :num_gt] = 1 # each group has positive and negative queries. input_query_class = input_query_class.tile([1, 2 * num_groups_denoising_queries]) input_query_bbox = input_query_bbox.tile([1, 2 * num_groups_denoising_queries, 1]) pad_gt_mask = pad_gt_mask.tile([1, 2 * num_groups_denoising_queries]) # positive and negative mask negative_gt_mask = torch.zeros([batch_size, max_gt_num * 2, 1], device=device) negative_gt_mask[:, max_gt_num:] = 1 negative_gt_mask = negative_gt_mask.tile([1, num_groups_denoising_queries, 1]) positive_gt_mask = 1 - negative_gt_mask # contrastive denoising training positive index positive_gt_mask = positive_gt_mask.squeeze(-1) * pad_gt_mask denoise_positive_idx = torch.nonzero(positive_gt_mask)[:, 1] denoise_positive_idx = torch.split( denoise_positive_idx, [n * num_groups_denoising_queries for n in num_ground_truths] ) # total denoising queries num_denoising_queries = torch_int(max_gt_num * 2 * num_groups_denoising_queries) if label_noise_ratio > 0: mask = torch.rand_like(input_query_class, dtype=torch.float) < (label_noise_ratio * 0.5) # randomly put a new one here new_label = torch.randint_like(mask, 0, num_classes, dtype=input_query_class.dtype) input_query_class = torch.where(mask & pad_gt_mask, new_label, input_query_class) if box_noise_scale > 0: known_bbox = center_to_corners_format(input_query_bbox) diff = torch.tile(input_query_bbox[..., 2:] * 0.5, [1, 1, 2]) * box_noise_scale rand_sign = torch.randint_like(input_query_bbox, 0, 2) * 2.0 - 1.0 rand_part = torch.rand_like(input_query_bbox) rand_part = (rand_part + 1.0) * negative_gt_mask + rand_part * (1 - negative_gt_mask) rand_part *= rand_sign known_bbox += rand_part * diff known_bbox.clip_(min=0.0, max=1.0) input_query_bbox = corners_to_center_format(known_bbox) input_query_bbox = inverse_sigmoid(input_query_bbox) input_query_class = class_embed(input_query_class) target_size = num_denoising_queries + num_queries attn_mask = torch.full([target_size, target_size], 0, dtype=torch.float, device=device) # match query cannot see the reconstruction attn_mask[num_denoising_queries:, :num_denoising_queries] = -torch.inf # reconstructions cannot see each other for i in range(num_groups_denoising_queries): idx_block_start = max_gt_num * 2 * i idx_block_end = max_gt_num * 2 * (i + 1) attn_mask[idx_block_start:idx_block_end, :idx_block_start] = -torch.inf attn_mask[idx_block_start:idx_block_end, idx_block_end:num_denoising_queries] = -torch.inf denoising_meta_values = { "dn_positive_idx": denoise_positive_idx, "dn_num_group": num_groups_denoising_queries, "dn_num_split": [num_denoising_queries, num_queries], } return input_query_class, input_query_bbox, attn_mask, denoising_meta_values class RTDetrConvEncoder(nn.Module): """ Convolutional backbone using the modeling_rt_detr_resnet.py. nn.BatchNorm2d layers are replaced by RTDetrFrozenBatchNorm2d as defined above. https://github.com/lyuwenyu/RT-DETR/blob/main/rtdetr_pytorch/src/nn/backbone/presnet.py#L142 """ def __init__(self, config): super().__init__() backbone = load_backbone(config) if config.freeze_backbone_batch_norms: # replace batch norm by frozen batch norm with torch.no_grad(): replace_batch_norm(backbone) self.model = backbone self.intermediate_channel_sizes = self.model.channels def forward(self, pixel_values: torch.Tensor, pixel_mask: torch.Tensor): # send pixel_values through the model to get list of feature maps features = self.model(pixel_values).feature_maps out = [] for feature_map in features: # downsample pixel_mask to match shape of corresponding feature_map mask = nn.functional.interpolate(pixel_mask[None].float(), size=feature_map.shape[-2:]).to(torch.bool)[0] out.append((feature_map, mask)) return out class RTDetrConvNormLayer(nn.Module): def __init__(self, config, in_channels, out_channels, kernel_size, stride, padding=None, activation=None): super().__init__() self.conv = nn.Conv2d( in_channels, out_channels, kernel_size, stride, padding=(kernel_size - 1) // 2 if padding is None else padding, bias=False, ) self.norm = nn.BatchNorm2d(out_channels, config.batch_norm_eps) self.activation = nn.Identity() if activation is None else ACT2CLS[activation]() def forward(self, hidden_state): hidden_state = self.conv(hidden_state) hidden_state = self.norm(hidden_state) hidden_state = self.activation(hidden_state) return hidden_state class RTDetrEncoderLayer(nn.Module): def __init__(self, config: RTDetrConfig): super().__init__() self.normalize_before = config.normalize_before # self-attention self.self_attn = RTDetrMultiheadAttention( embed_dim=config.encoder_hidden_dim, num_heads=config.num_attention_heads, dropout=config.dropout, ) self.self_attn_layer_norm = nn.LayerNorm(config.encoder_hidden_dim, eps=config.layer_norm_eps) self.dropout = config.dropout self.activation_fn = ACT2FN[config.encoder_activation_function] self.activation_dropout = config.activation_dropout self.fc1 = nn.Linear(config.encoder_hidden_dim, config.encoder_ffn_dim) self.fc2 = nn.Linear(config.encoder_ffn_dim, config.encoder_hidden_dim) self.final_layer_norm = nn.LayerNorm(config.encoder_hidden_dim, eps=config.layer_norm_eps) def forward( self, hidden_states: torch.Tensor, attention_mask: torch.Tensor, position_embeddings: Optional[torch.Tensor] = None, output_attentions: bool = False, **kwargs, ): """ Args: hidden_states (`torch.FloatTensor`): input to the layer of shape `(batch, seq_len, embed_dim)` attention_mask (`torch.FloatTensor`): attention mask of size `(batch, 1, target_len, source_len)` where padding elements are indicated by very large negative values. position_embeddings (`torch.FloatTensor`, *optional*): Object queries (also called content embeddings), to be added to the hidden states. output_attentions (`bool`, *optional*): Whether or not to return the attentions tensors of all attention layers. See `attentions` under returned tensors for more detail. """ residual = hidden_states if self.normalize_before: hidden_states = self.self_attn_layer_norm(hidden_states) hidden_states, attn_weights = self.self_attn( hidden_states=hidden_states, attention_mask=attention_mask, position_embeddings=position_embeddings, output_attentions=output_attentions, ) hidden_states = nn.functional.dropout(hidden_states, p=self.dropout, training=self.training) hidden_states = residual + hidden_states if not self.normalize_before: hidden_states = self.self_attn_layer_norm(hidden_states) if self.normalize_before: hidden_states = self.final_layer_norm(hidden_states) residual = hidden_states hidden_states = self.activation_fn(self.fc1(hidden_states)) hidden_states = nn.functional.dropout(hidden_states, p=self.activation_dropout, training=self.training) hidden_states = self.fc2(hidden_states) hidden_states = nn.functional.dropout(hidden_states, p=self.dropout, training=self.training) hidden_states = residual + hidden_states if not self.normalize_before: hidden_states = self.final_layer_norm(hidden_states) if self.training: if torch.isinf(hidden_states).any() or torch.isnan(hidden_states).any(): clamp_value = torch.finfo(hidden_states.dtype).max - 1000 hidden_states = torch.clamp(hidden_states, min=-clamp_value, max=clamp_value) outputs = (hidden_states,) if output_attentions: outputs += (attn_weights,) return outputs class RTDetrRepVggBlock(nn.Module): """ RepVGG architecture block introduced by the work "RepVGG: Making VGG-style ConvNets Great Again". """ def __init__(self, config: RTDetrConfig): super().__init__() activation = config.activation_function hidden_channels = int(config.encoder_hidden_dim * config.hidden_expansion) self.conv1 = RTDetrConvNormLayer(config, hidden_channels, hidden_channels, 3, 1, padding=1) self.conv2 = RTDetrConvNormLayer(config, hidden_channels, hidden_channels, 1, 1, padding=0) self.activation = nn.Identity() if activation is None else ACT2CLS[activation]() def forward(self, x): y = self.conv1(x) + self.conv2(x) return self.activation(y) class RTDetrCSPRepLayer(nn.Module): """ Cross Stage Partial (CSP) network layer with RepVGG blocks. """ def __init__(self, config: RTDetrConfig): super().__init__() in_channels = config.encoder_hidden_dim * 2 out_channels = config.encoder_hidden_dim num_blocks = 3 activation = config.activation_function hidden_channels = int(out_channels * config.hidden_expansion) self.conv1 = RTDetrConvNormLayer(config, in_channels, hidden_channels, 1, 1, activation=activation) self.conv2 = RTDetrConvNormLayer(config, in_channels, hidden_channels, 1, 1, activation=activation) self.bottlenecks = nn.Sequential(*[RTDetrRepVggBlock(config) for _ in range(num_blocks)]) if hidden_channels != out_channels: self.conv3 = RTDetrConvNormLayer(config, hidden_channels, out_channels, 1, 1, activation=activation) else: self.conv3 = nn.Identity() def forward(self, hidden_state): hidden_state_1 = self.conv1(hidden_state) hidden_state_1 = self.bottlenecks(hidden_state_1) hidden_state_2 = self.conv2(hidden_state) return self.conv3(hidden_state_1 + hidden_state_2) # Copied from transformers.models.deformable_detr.modeling_deformable_detr.DeformableDetrMultiscaleDeformableAttention with DeformableDetr->RTDetr class RTDetrMultiscaleDeformableAttention(nn.Module): """ Multiscale deformable attention as proposed in Deformable DETR. """ def __init__(self, config: RTDetrConfig, num_heads: int, n_points: int): super().__init__() self.attn = MultiScaleDeformableAttention() if config.d_model % num_heads != 0: raise ValueError( f"embed_dim (d_model) must be divisible by num_heads, but got {config.d_model} and {num_heads}" ) dim_per_head = config.d_model // num_heads # check if dim_per_head is power of 2 if not ((dim_per_head & (dim_per_head - 1) == 0) and dim_per_head != 0): warnings.warn( "You'd better set embed_dim (d_model) in RTDetrMultiscaleDeformableAttention to make the" " dimension of each attention head a power of 2 which is more efficient in the authors' CUDA" " implementation." ) self.im2col_step = 64 self.d_model = config.d_model self.n_levels = config.num_feature_levels self.n_heads = num_heads self.n_points = n_points self.sampling_offsets = nn.Linear(config.d_model, num_heads * self.n_levels * n_points * 2) self.attention_weights = nn.Linear(config.d_model, num_heads * self.n_levels * n_points) self.value_proj = nn.Linear(config.d_model, config.d_model) self.output_proj = nn.Linear(config.d_model, config.d_model) self.disable_custom_kernels = config.disable_custom_kernels def with_pos_embed(self, tensor: torch.Tensor, position_embeddings: Optional[Tensor]): return tensor if position_embeddings is None else tensor + position_embeddings def forward( self, hidden_states: torch.Tensor, attention_mask: Optional[torch.Tensor] = None, encoder_hidden_states=None, encoder_attention_mask=None, position_embeddings: Optional[torch.Tensor] = None, reference_points=None, spatial_shapes=None, spatial_shapes_list=None, level_start_index=None, output_attentions: bool = False, ): # add position embeddings to the hidden states before projecting to queries and keys if position_embeddings is not None: hidden_states = self.with_pos_embed(hidden_states, position_embeddings) batch_size, num_queries, _ = hidden_states.shape batch_size, sequence_length, _ = encoder_hidden_states.shape total_elements = sum(height * width for height, width in spatial_shapes_list) if total_elements != sequence_length: raise ValueError( "Make sure to align the spatial shapes with the sequence length of the encoder hidden states" ) value = self.value_proj(encoder_hidden_states) if attention_mask is not None: # we invert the attention_mask value = value.masked_fill(~attention_mask[..., None], float(0)) value = value.view(batch_size, sequence_length, self.n_heads, self.d_model // self.n_heads) sampling_offsets = self.sampling_offsets(hidden_states).view( batch_size, num_queries, self.n_heads, self.n_levels, self.n_points, 2 ) attention_weights = self.attention_weights(hidden_states).view( batch_size, num_queries, self.n_heads, self.n_levels * self.n_points ) attention_weights = F.softmax(attention_weights, -1).view( batch_size, num_queries, self.n_heads, self.n_levels, self.n_points ) # batch_size, num_queries, n_heads, n_levels, n_points, 2 num_coordinates = reference_points.shape[-1] if num_coordinates == 2: offset_normalizer = torch.stack([spatial_shapes[..., 1], spatial_shapes[..., 0]], -1) sampling_locations = ( reference_points[:, :, None, :, None, :] + sampling_offsets / offset_normalizer[None, None, None, :, None, :] ) elif num_coordinates == 4: sampling_locations = ( reference_points[:, :, None, :, None, :2] + sampling_offsets / self.n_points * reference_points[:, :, None, :, None, 2:] * 0.5 ) else: raise ValueError(f"Last dim of reference_points must be 2 or 4, but got {reference_points.shape[-1]}") output = self.attn( value, spatial_shapes, spatial_shapes_list, level_start_index, sampling_locations, attention_weights, self.im2col_step, ) output = self.output_proj(output) return output, attention_weights class RTDetrMultiheadAttention(nn.Module): """ Multi-headed attention from 'Attention Is All You Need' paper. Here, we add position embeddings to the queries and keys (as explained in the Deformable DETR paper). """ def __init__( self, embed_dim: int, num_heads: int, dropout: float = 0.0, bias: bool = True, ): super().__init__() self.embed_dim = embed_dim self.num_heads = num_heads self.dropout = dropout self.head_dim = embed_dim // num_heads if self.head_dim * num_heads != self.embed_dim: raise ValueError( f"embed_dim must be divisible by num_heads (got `embed_dim`: {self.embed_dim} and `num_heads`:" f" {num_heads})." ) self.scaling = self.head_dim**-0.5 self.k_proj = nn.Linear(embed_dim, embed_dim, bias=bias) self.v_proj = nn.Linear(embed_dim, embed_dim, bias=bias) self.q_proj = nn.Linear(embed_dim, embed_dim, bias=bias) self.out_proj = nn.Linear(embed_dim, embed_dim, bias=bias) def _reshape(self, tensor: torch.Tensor, seq_len: int, batch_size: int): return tensor.view(batch_size, seq_len, self.num_heads, self.head_dim).transpose(1, 2).contiguous() def with_pos_embed(self, tensor: torch.Tensor, position_embeddings: Optional[Tensor]): return tensor if position_embeddings is None else tensor + position_embeddings def forward( self, hidden_states: torch.Tensor, attention_mask: Optional[torch.Tensor] = None, position_embeddings: Optional[torch.Tensor] = None, output_attentions: bool = False, ) -> tuple[torch.Tensor, Optional[torch.Tensor], Optional[tuple[torch.Tensor]]]: """Input shape: Batch x Time x Channel""" batch_size, target_len, embed_dim = hidden_states.size() # add position embeddings to the hidden states before projecting to queries and keys if position_embeddings is not None: hidden_states_original = hidden_states hidden_states = self.with_pos_embed(hidden_states, position_embeddings) # get queries, keys and values query_states = self.q_proj(hidden_states) * self.scaling key_states = self._reshape(self.k_proj(hidden_states), -1, batch_size) value_states = self._reshape(self.v_proj(hidden_states_original), -1, batch_size) proj_shape = (batch_size * self.num_heads, -1, self.head_dim) query_states = self._reshape(query_states, target_len, batch_size).view(*proj_shape) key_states = key_states.view(*proj_shape) value_states = value_states.view(*proj_shape) source_len = key_states.size(1) attn_weights = torch.bmm(query_states, key_states.transpose(1, 2)) if attn_weights.size() != (batch_size * self.num_heads, target_len, source_len): raise ValueError( f"Attention weights should be of size {(batch_size * self.num_heads, target_len, source_len)}, but is" f" {attn_weights.size()}" ) # expand attention_mask if attention_mask is not None: # [seq_len, seq_len] -> [batch_size, 1, target_seq_len, source_seq_len] attention_mask = attention_mask.expand(batch_size, 1, *attention_mask.size()) if attention_mask is not None: if attention_mask.size() != (batch_size, 1, target_len, source_len): raise ValueError( f"Attention mask should be of size {(batch_size, 1, target_len, source_len)}, but is" f" {attention_mask.size()}" ) if attention_mask.dtype == torch.bool: attention_mask = torch.zeros_like(attention_mask, dtype=attn_weights.dtype).masked_fill_( attention_mask, -torch.inf ) attn_weights = attn_weights.view(batch_size, self.num_heads, target_len, source_len) + attention_mask attn_weights = attn_weights.view(batch_size * self.num_heads, target_len, source_len) attn_weights = nn.functional.softmax(attn_weights, dim=-1) if output_attentions: # this operation is a bit awkward, but it's required to # make sure that attn_weights keeps its gradient. # In order to do so, attn_weights have to reshaped # twice and have to be reused in the following attn_weights_reshaped = attn_weights.view(batch_size, self.num_heads, target_len, source_len) attn_weights = attn_weights_reshaped.view(batch_size * self.num_heads, target_len, source_len) else: attn_weights_reshaped = None attn_probs = nn.functional.dropout(attn_weights, p=self.dropout, training=self.training) attn_output = torch.bmm(attn_probs, value_states) if attn_output.size() != (batch_size * self.num_heads, target_len, self.head_dim): raise ValueError( f"`attn_output` should be of size {(batch_size, self.num_heads, target_len, self.head_dim)}, but is" f" {attn_output.size()}" ) attn_output = attn_output.view(batch_size, self.num_heads, target_len, self.head_dim) attn_output = attn_output.transpose(1, 2) attn_output = attn_output.reshape(batch_size, target_len, embed_dim) attn_output = self.out_proj(attn_output) return attn_output, attn_weights_reshaped class RTDetrDecoderLayer(nn.Module): def __init__(self, config: RTDetrConfig): super().__init__() # self-attention self.self_attn = RTDetrMultiheadAttention( embed_dim=config.d_model, num_heads=config.decoder_attention_heads, dropout=config.attention_dropout, ) self.dropout = config.dropout self.activation_fn = ACT2FN[config.decoder_activation_function] self.activation_dropout = config.activation_dropout self.self_attn_layer_norm = nn.LayerNorm(config.d_model, eps=config.layer_norm_eps) # cross-attention self.encoder_attn = RTDetrMultiscaleDeformableAttention( config, num_heads=config.decoder_attention_heads, n_points=config.decoder_n_points, ) self.encoder_attn_layer_norm = nn.LayerNorm(config.d_model, eps=config.layer_norm_eps) # feedforward neural networks self.fc1 = nn.Linear(config.d_model, config.decoder_ffn_dim) self.fc2 = nn.Linear(config.decoder_ffn_dim, config.d_model) self.final_layer_norm = nn.LayerNorm(config.d_model, eps=config.layer_norm_eps) def forward( self, hidden_states: torch.Tensor, position_embeddings: Optional[torch.Tensor] = None, reference_points=None, spatial_shapes=None, spatial_shapes_list=None, level_start_index=None, encoder_hidden_states: Optional[torch.Tensor] = None, encoder_attention_mask: Optional[torch.Tensor] = None, output_attentions: Optional[bool] = False, ): """ Args: hidden_states (`torch.FloatTensor`): Input to the layer of shape `(seq_len, batch, embed_dim)`. position_embeddings (`torch.FloatTensor`, *optional*): Position embeddings that are added to the queries and keys in the self-attention layer. reference_points (`torch.FloatTensor`, *optional*): Reference points. spatial_shapes (`torch.LongTensor`, *optional*): Spatial shapes. level_start_index (`torch.LongTensor`, *optional*): Level start index. encoder_hidden_states (`torch.FloatTensor`): cross attention input to the layer of shape `(seq_len, batch, embed_dim)` encoder_attention_mask (`torch.FloatTensor`): encoder attention mask of size `(batch, 1, target_len, source_len)` where padding elements are indicated by very large negative values. output_attentions (`bool`, *optional*): Whether or not to return the attentions tensors of all attention layers. See `attentions` under returned tensors for more detail. """ residual = hidden_states # Self Attention hidden_states, self_attn_weights = self.self_attn( hidden_states=hidden_states, attention_mask=encoder_attention_mask, position_embeddings=position_embeddings, output_attentions=output_attentions, ) hidden_states = nn.functional.dropout(hidden_states, p=self.dropout, training=self.training) hidden_states = residual + hidden_states hidden_states = self.self_attn_layer_norm(hidden_states) second_residual = hidden_states # Cross-Attention cross_attn_weights = None hidden_states, cross_attn_weights = self.encoder_attn( hidden_states=hidden_states, encoder_hidden_states=encoder_hidden_states, position_embeddings=position_embeddings, reference_points=reference_points, spatial_shapes=spatial_shapes, spatial_shapes_list=spatial_shapes_list, level_start_index=level_start_index, output_attentions=output_attentions, ) hidden_states = nn.functional.dropout(hidden_states, p=self.dropout, training=self.training) hidden_states = second_residual + hidden_states hidden_states = self.encoder_attn_layer_norm(hidden_states) # Fully Connected residual = hidden_states hidden_states = self.activation_fn(self.fc1(hidden_states)) hidden_states = nn.functional.dropout(hidden_states, p=self.activation_dropout, training=self.training) hidden_states = self.fc2(hidden_states) hidden_states = nn.functional.dropout(hidden_states, p=self.dropout, training=self.training) hidden_states = residual + hidden_states hidden_states = self.final_layer_norm(hidden_states) outputs = (hidden_states,) if output_attentions: outputs += (self_attn_weights, cross_attn_weights) return outputs @auto_docstring class RTDetrPreTrainedModel(PreTrainedModel): config: RTDetrConfig base_model_prefix = "rt_detr" main_input_name = "pixel_values" _no_split_modules = [r"RTDetrHybridEncoder", r"RTDetrDecoderLayer"] def _init_weights(self, module): """Initialize the weights""" if isinstance(module, (RTDetrForObjectDetection, RTDetrDecoder)): if module.class_embed is not None: for layer in module.class_embed: prior_prob = self.config.initializer_bias_prior_prob or 1 / (self.config.num_labels + 1) bias = float(-math.log((1 - prior_prob) / prior_prob)) nn.init.xavier_uniform_(layer.weight) nn.init.constant_(layer.bias, bias) if module.bbox_embed is not None: for layer in module.bbox_embed: nn.init.constant_(layer.layers[-1].weight, 0) nn.init.constant_(layer.layers[-1].bias, 0) elif isinstance(module, RTDetrMultiscaleDeformableAttention): nn.init.constant_(module.sampling_offsets.weight.data, 0.0) default_dtype = torch.get_default_dtype() thetas = torch.arange(module.n_heads, dtype=torch.int64).to(default_dtype) * ( 2.0 * math.pi / module.n_heads ) grid_init = torch.stack([thetas.cos(), thetas.sin()], -1) grid_init = ( (grid_init / grid_init.abs().max(-1, keepdim=True)[0]) .view(module.n_heads, 1, 1, 2) .repeat(1, module.n_levels, module.n_points, 1) ) for i in range(module.n_points): grid_init[:, :, i, :] *= i + 1 with torch.no_grad(): module.sampling_offsets.bias = nn.Parameter(grid_init.view(-1)) nn.init.constant_(module.attention_weights.weight.data, 0.0) nn.init.constant_(module.attention_weights.bias.data, 0.0) nn.init.xavier_uniform_(module.value_proj.weight.data) nn.init.constant_(module.value_proj.bias.data, 0.0) nn.init.xavier_uniform_(module.output_proj.weight.data) nn.init.constant_(module.output_proj.bias.data, 0.0) elif isinstance(module, RTDetrModel): prior_prob = self.config.initializer_bias_prior_prob or 1 / (self.config.num_labels + 1) bias = float(-math.log((1 - prior_prob) / prior_prob)) nn.init.xavier_uniform_(module.enc_score_head.weight) nn.init.constant_(module.enc_score_head.bias, bias) elif isinstance(module, (nn.Linear, nn.Conv2d, nn.BatchNorm2d)): module.weight.data.normal_(mean=0.0, std=self.config.initializer_range) if module.bias is not None: module.bias.data.zero_() elif isinstance(module, nn.LayerNorm): module.weight.data.fill_(1.0) module.bias.data.zero_() if hasattr(module, "weight_embedding") and self.config.learn_initial_query: nn.init.xavier_uniform_(module.weight_embedding.weight) if hasattr(module, "denoising_class_embed") and self.config.num_denoising > 0: nn.init.xavier_uniform_(module.denoising_class_embed.weight) class RTDetrEncoder(nn.Module): def __init__(self, config: RTDetrConfig): super().__init__() self.layers = nn.ModuleList([RTDetrEncoderLayer(config) for _ in range(config.encoder_layers)]) def forward(self, src, src_mask=None, pos_embed=None, output_attentions: bool = False) -> torch.Tensor: hidden_states = src for layer in self.layers: hidden_states = layer( hidden_states, attention_mask=src_mask, position_embeddings=pos_embed, output_attentions=output_attentions, ) return hidden_states class RTDetrHybridEncoder(nn.Module): """ Decoder consisting of a projection layer, a set of `RTDetrEncoder`, a top-down Feature Pyramid Network (FPN) and a bottom-up Path Aggregation Network (PAN). More details on the paper: https://huggingface.co/papers/2304.08069 Args: config: RTDetrConfig """ def __init__(self, config: RTDetrConfig): super().__init__() self.config = config self.in_channels = config.encoder_in_channels self.feat_strides = config.feat_strides self.encoder_hidden_dim = config.encoder_hidden_dim self.encode_proj_layers = config.encode_proj_layers self.positional_encoding_temperature = config.positional_encoding_temperature self.eval_size = config.eval_size self.out_channels = [self.encoder_hidden_dim for _ in self.in_channels] self.out_strides = self.feat_strides self.num_fpn_stages = len(self.in_channels) - 1 self.num_pan_stages = len(self.in_channels) - 1 activation = config.activation_function # encoder transformer self.encoder = nn.ModuleList([RTDetrEncoder(config) for _ in range(len(self.encode_proj_layers))]) # top-down FPN self.lateral_convs = nn.ModuleList() self.fpn_blocks = nn.ModuleList() for _ in range(self.num_fpn_stages): lateral_conv = RTDetrConvNormLayer( config, in_channels=self.encoder_hidden_dim, out_channels=self.encoder_hidden_dim, kernel_size=1, stride=1, activation=activation, ) fpn_block = RTDetrCSPRepLayer(config) self.lateral_convs.append(lateral_conv) self.fpn_blocks.append(fpn_block) # bottom-up PAN self.downsample_convs = nn.ModuleList() self.pan_blocks = nn.ModuleList() for _ in range(self.num_pan_stages): downsample_conv = RTDetrConvNormLayer( config, in_channels=self.encoder_hidden_dim, out_channels=self.encoder_hidden_dim, kernel_size=3, stride=2, activation=activation, ) pan_block = RTDetrCSPRepLayer(config) self.downsample_convs.append(downsample_conv) self.pan_blocks.append(pan_block) @staticmethod def build_2d_sincos_position_embedding( width, height, embed_dim=256, temperature=10000.0, device="cpu", dtype=torch.float32 ): grid_w = torch.arange(torch_int(width), device=device).to(dtype) grid_h = torch.arange(torch_int(height), device=device).to(dtype) grid_w, grid_h = torch.meshgrid(grid_w, grid_h, indexing="ij") if embed_dim % 4 != 0: raise ValueError("Embed dimension must be divisible by 4 for 2D sin-cos position embedding") pos_dim = embed_dim // 4 omega = torch.arange(pos_dim, device=device).to(dtype) / pos_dim omega = 1.0 / (temperature**omega) out_w = grid_w.flatten()[..., None] @ omega[None] out_h = grid_h.flatten()[..., None] @ omega[None] return torch.concat([out_w.sin(), out_w.cos(), out_h.sin(), out_h.cos()], dim=1)[None, :, :] def forward( self, inputs_embeds=None, attention_mask=None, position_embeddings=None, spatial_shapes=None, level_start_index=None, valid_ratios=None, output_attentions=None, output_hidden_states=None, return_dict=None, ): r""" Args: inputs_embeds (`torch.FloatTensor` of shape `(batch_size, sequence_length, hidden_size)`): Flattened feature map (output of the backbone + projection layer) that is passed to the encoder. attention_mask (`torch.Tensor` of shape `(batch_size, sequence_length)`, *optional*): Mask to avoid performing attention on padding pixel features. Mask values selected in `[0, 1]`: - 1 for pixel features that are real (i.e. **not masked**), - 0 for pixel features that are padding (i.e. **masked**). [What are attention masks?](../glossary#attention-mask) position_embeddings (`torch.FloatTensor` of shape `(batch_size, sequence_length, hidden_size)`): Position embeddings that are added to the queries and keys in each self-attention layer. spatial_shapes (`torch.LongTensor` of shape `(num_feature_levels, 2)`): Spatial shapes of each feature map. level_start_index (`torch.LongTensor` of shape `(num_feature_levels)`): Starting index of each feature map. valid_ratios (`torch.FloatTensor` of shape `(batch_size, num_feature_levels, 2)`): Ratio of valid area in each feature level. output_attentions (`bool`, *optional*): Whether or not to return the attentions tensors of all attention layers. See `attentions` under returned tensors for more detail. output_hidden_states (`bool`, *optional*): Whether or not to return the hidden states of all layers. See `hidden_states` under returned tensors for more detail. return_dict (`bool`, *optional*): Whether or not to return a [`~file_utils.ModelOutput`] instead of a plain tuple. """ output_attentions = output_attentions if output_attentions is not None else self.config.output_attentions output_hidden_states = ( output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states ) return_dict = return_dict if return_dict is not None else self.config.use_return_dict hidden_states = inputs_embeds encoder_states = () if output_hidden_states else None all_attentions = () if output_attentions else None # encoder if self.config.encoder_layers > 0: for i, enc_ind in enumerate(self.encode_proj_layers): if output_hidden_states: encoder_states = encoder_states + (hidden_states[enc_ind],) height, width = hidden_states[enc_ind].shape[2:] # flatten [batch, channel, height, width] to [batch, height*width, channel] src_flatten = hidden_states[enc_ind].flatten(2).permute(0, 2, 1) if self.training or self.eval_size is None: pos_embed = self.build_2d_sincos_position_embedding( width, height, self.encoder_hidden_dim, self.positional_encoding_temperature, device=src_flatten.device, dtype=src_flatten.dtype, ) else: pos_embed = None layer_outputs = self.encoder[i]( src_flatten, pos_embed=pos_embed, output_attentions=output_attentions, ) hidden_states[enc_ind] = ( layer_outputs[0].permute(0, 2, 1).reshape(-1, self.encoder_hidden_dim, height, width).contiguous() ) if output_attentions: all_attentions = all_attentions + (layer_outputs[1],) if output_hidden_states: encoder_states = encoder_states + (hidden_states[enc_ind],) # top-down FPN fpn_feature_maps = [hidden_states[-1]] for idx, (lateral_conv, fpn_block) in enumerate(zip(self.lateral_convs, self.fpn_blocks)): backbone_feature_map = hidden_states[self.num_fpn_stages - idx - 1] top_fpn_feature_map = fpn_feature_maps[-1] # apply lateral block top_fpn_feature_map = lateral_conv(top_fpn_feature_map) fpn_feature_maps[-1] = top_fpn_feature_map # apply fpn block top_fpn_feature_map = F.interpolate(top_fpn_feature_map, scale_factor=2.0, mode="nearest") fused_feature_map = torch.concat([top_fpn_feature_map, backbone_feature_map], dim=1) new_fpn_feature_map = fpn_block(fused_feature_map) fpn_feature_maps.append(new_fpn_feature_map) fpn_feature_maps = fpn_feature_maps[::-1] # bottom-up PAN pan_feature_maps = [fpn_feature_maps[0]] for idx, (downsample_conv, pan_block) in enumerate(zip(self.downsample_convs, self.pan_blocks)): top_pan_feature_map = pan_feature_maps[-1] fpn_feature_map = fpn_feature_maps[idx + 1] downsampled_feature_map = downsample_conv(top_pan_feature_map) fused_feature_map = torch.concat([downsampled_feature_map, fpn_feature_map], dim=1) new_pan_feature_map = pan_block(fused_feature_map) pan_feature_maps.append(new_pan_feature_map) if not return_dict: return tuple(v for v in [pan_feature_maps, encoder_states, all_attentions] if v is not None) return BaseModelOutput( last_hidden_state=pan_feature_maps, hidden_states=encoder_states, attentions=all_attentions ) class RTDetrDecoder(RTDetrPreTrainedModel): def __init__(self, config: RTDetrConfig): super().__init__(config) self.dropout = config.dropout self.layers = nn.ModuleList([RTDetrDecoderLayer(config) for _ in range(config.decoder_layers)]) self.query_pos_head = RTDetrMLPPredictionHead(config, 4, 2 * config.d_model, config.d_model, num_layers=2) # hack implementation for iterative bounding box refinement and two-stage Deformable DETR self.bbox_embed = None self.class_embed = None # Initialize weights and apply final processing self.post_init() def forward( self, inputs_embeds=None, encoder_hidden_states=None, encoder_attention_mask=None, position_embeddings=None, reference_points=None, spatial_shapes=None, spatial_shapes_list=None, level_start_index=None, valid_ratios=None, output_attentions=None, output_hidden_states=None, return_dict=None, ): r""" Args: inputs_embeds (`torch.FloatTensor` of shape `(batch_size, num_queries, hidden_size)`): The query embeddings that are passed into the decoder. encoder_hidden_states (`torch.FloatTensor` of shape `(batch_size, sequence_length, hidden_size)`, *optional*): Sequence of hidden-states at the output of the last layer of the encoder. Used in the cross-attention of the decoder. encoder_attention_mask (`torch.LongTensor` of shape `(batch_size, sequence_length)`, *optional*): Mask to avoid performing cross-attention on padding pixel_values of the encoder. Mask values selected in `[0, 1]`: - 1 for pixels that are real (i.e. **not masked**), - 0 for pixels that are padding (i.e. **masked**). position_embeddings (`torch.FloatTensor` of shape `(batch_size, num_queries, hidden_size)`, *optional*): Position embeddings that are added to the queries and keys in each self-attention layer. reference_points (`torch.FloatTensor` of shape `(batch_size, num_queries, 4)` is `as_two_stage` else `(batch_size, num_queries, 2)` or , *optional*): Reference point in range `[0, 1]`, top-left (0,0), bottom-right (1, 1), including padding area. spatial_shapes (`torch.FloatTensor` of shape `(num_feature_levels, 2)`): Spatial shapes of the feature maps. level_start_index (`torch.LongTensor` of shape `(num_feature_levels)`, *optional*): Indexes for the start of each feature level. In range `[0, sequence_length]`. valid_ratios (`torch.FloatTensor` of shape `(batch_size, num_feature_levels, 2)`, *optional*): Ratio of valid area in each feature level. output_attentions (`bool`, *optional*): Whether or not to return the attentions tensors of all attention layers. See `attentions` under returned tensors for more detail. output_hidden_states (`bool`, *optional*): Whether or not to return the hidden states of all layers. See `hidden_states` under returned tensors for more detail. return_dict (`bool`, *optional*): Whether or not to return a [`~file_utils.ModelOutput`] instead of a plain tuple. """ output_attentions = output_attentions if output_attentions is not None else self.config.output_attentions output_hidden_states = ( output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states ) return_dict = return_dict if return_dict is not None else self.config.use_return_dict if inputs_embeds is not None: hidden_states = inputs_embeds # decoder layers all_hidden_states = () if output_hidden_states else None all_self_attns = () if output_attentions else None all_cross_attentions = () if (output_attentions and encoder_hidden_states is not None) else None intermediate = () intermediate_reference_points = () intermediate_logits = () reference_points = F.sigmoid(reference_points) # https://github.com/lyuwenyu/RT-DETR/blob/94f5e16708329d2f2716426868ec89aa774af016/rtdetr_pytorch/src/zoo/rtdetr/rtdetr_decoder.py#L252 for idx, decoder_layer in enumerate(self.layers): reference_points_input = reference_points.unsqueeze(2) position_embeddings = self.query_pos_head(reference_points) if output_hidden_states: all_hidden_states += (hidden_states,) layer_outputs = decoder_layer( hidden_states, position_embeddings=position_embeddings, encoder_hidden_states=encoder_hidden_states, reference_points=reference_points_input, spatial_shapes=spatial_shapes, spatial_shapes_list=spatial_shapes_list, level_start_index=level_start_index, encoder_attention_mask=encoder_attention_mask, output_attentions=output_attentions, ) hidden_states = layer_outputs[0] # hack implementation for iterative bounding box refinement if self.bbox_embed is not None: predicted_corners = self.bbox_embed[idx](hidden_states) new_reference_points = F.sigmoid(predicted_corners + inverse_sigmoid(reference_points)) reference_points = new_reference_points.detach() intermediate += (hidden_states,) intermediate_reference_points += ( (new_reference_points,) if self.bbox_embed is not None else (reference_points,) ) if self.class_embed is not None: logits = self.class_embed[idx](hidden_states) intermediate_logits += (logits,) if output_attentions: all_self_attns += (layer_outputs[1],) if encoder_hidden_states is not None: all_cross_attentions += (layer_outputs[2],) # Keep batch_size as first dimension intermediate = torch.stack(intermediate, dim=1) intermediate_reference_points = torch.stack(intermediate_reference_points, dim=1) if self.class_embed is not None: intermediate_logits = torch.stack(intermediate_logits, dim=1) # add hidden states from the last decoder layer if output_hidden_states: all_hidden_states += (hidden_states,) if not return_dict: return tuple( v for v in [ hidden_states, intermediate, intermediate_logits, intermediate_reference_points, all_hidden_states, all_self_attns, all_cross_attentions, ] if v is not None ) return RTDetrDecoderOutput( last_hidden_state=hidden_states, intermediate_hidden_states=intermediate, intermediate_logits=intermediate_logits, intermediate_reference_points=intermediate_reference_points, hidden_states=all_hidden_states, attentions=all_self_attns, cross_attentions=all_cross_attentions, ) # taken from https://github.com/facebookresearch/detr/blob/master/models/detr.py class RTDetrMLPPredictionHead(nn.Module): """ Very simple multi-layer perceptron (MLP, also called FFN), used to predict the normalized center coordinates, height and width of a bounding box w.r.t. an image. Copied from https://github.com/facebookresearch/detr/blob/master/models/detr.py Origin from https://github.com/lyuwenyu/RT-DETR/blob/94f5e16708329d2f2716426868ec89aa774af016/rtdetr_paddle/ppdet/modeling/transformers/utils.py#L453 """ def __init__(self, config, input_dim, d_model, output_dim, num_layers): super().__init__() self.num_layers = num_layers h = [d_model] * (num_layers - 1) self.layers = nn.ModuleList(nn.Linear(n, k) for n, k in zip([input_dim] + h, h + [output_dim])) def forward(self, x): for i, layer in enumerate(self.layers): x = nn.functional.relu(layer(x)) if i < self.num_layers - 1 else layer(x) return x @auto_docstring( custom_intro=""" RT-DETR Model (consisting of a backbone and encoder-decoder) outputting raw hidden states without any head on top. """ ) class RTDetrModel(RTDetrPreTrainedModel): def __init__(self, config: RTDetrConfig): super().__init__(config) # Create backbone self.backbone = RTDetrConvEncoder(config) intermediate_channel_sizes = self.backbone.intermediate_channel_sizes # Create encoder input projection layers # https://github.com/lyuwenyu/RT-DETR/blob/94f5e16708329d2f2716426868ec89aa774af016/rtdetr_pytorch/src/zoo/rtdetr/hybrid_encoder.py#L212 num_backbone_outs = len(intermediate_channel_sizes) encoder_input_proj_list = [] for _ in range(num_backbone_outs): in_channels = intermediate_channel_sizes[_] encoder_input_proj_list.append( nn.Sequential( nn.Conv2d(in_channels, config.encoder_hidden_dim, kernel_size=1, bias=False), nn.BatchNorm2d(config.encoder_hidden_dim), ) ) self.encoder_input_proj = nn.ModuleList(encoder_input_proj_list) # Create encoder self.encoder = RTDetrHybridEncoder(config) # denoising part if config.num_denoising > 0: self.denoising_class_embed = nn.Embedding( config.num_labels + 1, config.d_model, padding_idx=config.num_labels ) # decoder embedding if config.learn_initial_query: self.weight_embedding = nn.Embedding(config.num_queries, config.d_model) # encoder head self.enc_output = nn.Sequential( nn.Linear(config.d_model, config.d_model), nn.LayerNorm(config.d_model, eps=config.layer_norm_eps), ) self.enc_score_head = nn.Linear(config.d_model, config.num_labels) self.enc_bbox_head = RTDetrMLPPredictionHead(config, config.d_model, config.d_model, 4, num_layers=3) # init encoder output anchors and valid_mask if config.anchor_image_size: self.anchors, self.valid_mask = self.generate_anchors(dtype=self.dtype) # Create decoder input projection layers # https://github.com/lyuwenyu/RT-DETR/blob/94f5e16708329d2f2716426868ec89aa774af016/rtdetr_pytorch/src/zoo/rtdetr/rtdetr_decoder.py#L412 num_backbone_outs = len(config.decoder_in_channels) decoder_input_proj_list = [] for _ in range(num_backbone_outs): in_channels = config.decoder_in_channels[_] decoder_input_proj_list.append( nn.Sequential( nn.Conv2d(in_channels, config.d_model, kernel_size=1, bias=False), nn.BatchNorm2d(config.d_model, config.batch_norm_eps), ) ) for _ in range(config.num_feature_levels - num_backbone_outs): decoder_input_proj_list.append( nn.Sequential( nn.Conv2d(in_channels, config.d_model, kernel_size=3, stride=2, padding=1, bias=False), nn.BatchNorm2d(config.d_model, config.batch_norm_eps), ) ) in_channels = config.d_model self.decoder_input_proj = nn.ModuleList(decoder_input_proj_list) # decoder self.decoder = RTDetrDecoder(config) self.post_init() def get_encoder(self): return self.encoder def freeze_backbone(self): for param in self.backbone.parameters(): param.requires_grad_(False) def unfreeze_backbone(self): for param in self.backbone.parameters(): param.requires_grad_(True) @compile_compatible_method_lru_cache(maxsize=32) def generate_anchors(self, spatial_shapes=None, grid_size=0.05, device="cpu", dtype=torch.float32): if spatial_shapes is None: spatial_shapes = [ [int(self.config.anchor_image_size[0] / s), int(self.config.anchor_image_size[1] / s)] for s in self.config.feat_strides ] anchors = [] for level, (height, width) in enumerate(spatial_shapes): grid_y, grid_x = torch.meshgrid( torch.arange(end=height, device=device).to(dtype), torch.arange(end=width, device=device).to(dtype), indexing="ij", ) grid_xy = torch.stack([grid_x, grid_y], -1) grid_xy = grid_xy.unsqueeze(0) + 0.5 grid_xy[..., 0] /= width grid_xy[..., 1] /= height wh = torch.ones_like(grid_xy) * grid_size * (2.0**level) anchors.append(torch.concat([grid_xy, wh], -1).reshape(-1, height * width, 4)) # define the valid range for anchor coordinates eps = 1e-2 anchors = torch.concat(anchors, 1) valid_mask = ((anchors > eps) * (anchors < 1 - eps)).all(-1, keepdim=True) anchors = torch.log(anchors / (1 - anchors)) anchors = torch.where(valid_mask, anchors, torch.tensor(torch.finfo(dtype).max, dtype=dtype, device=device)) return anchors, valid_mask @auto_docstring def forward( self, pixel_values: torch.FloatTensor, pixel_mask: Optional[torch.LongTensor] = None, encoder_outputs: Optional[torch.FloatTensor] = None, inputs_embeds: Optional[torch.FloatTensor] = None, decoder_inputs_embeds: Optional[torch.FloatTensor] = None, labels: Optional[list[dict]] = None, output_attentions: Optional[bool] = None, output_hidden_states: Optional[bool] = None, return_dict: Optional[bool] = None, ) -> Union[tuple[torch.FloatTensor], RTDetrModelOutput]: r""" inputs_embeds (`torch.FloatTensor` of shape `(batch_size, sequence_length, hidden_size)`, *optional*): Optionally, instead of passing the flattened feature map (output of the backbone + projection layer), you can choose to directly pass a flattened representation of an image. decoder_inputs_embeds (`torch.FloatTensor` of shape `(batch_size, num_queries, hidden_size)`, *optional*): Optionally, instead of initializing the queries with a tensor of zeros, you can choose to directly pass an embedded representation. labels (`list[Dict]` of len `(batch_size,)`, *optional*): Labels for computing the bipartite matching loss. List of dicts, each dictionary containing at least the following 2 keys: 'class_labels' and 'boxes' (the class labels and bounding boxes of an image in the batch respectively). The class labels themselves should be a `torch.LongTensor` of len `(number of bounding boxes in the image,)` and the boxes a `torch.FloatTensor` of shape `(number of bounding boxes in the image, 4)`. Examples: ```python >>> from transformers import AutoImageProcessor, RTDetrModel >>> from PIL import Image >>> import requests >>> url = "http://images.cocodataset.org/val2017/000000039769.jpg" >>> image = Image.open(requests.get(url, stream=True).raw) >>> image_processor = AutoImageProcessor.from_pretrained("PekingU/rtdetr_r50vd") >>> model = RTDetrModel.from_pretrained("PekingU/rtdetr_r50vd") >>> inputs = image_processor(images=image, return_tensors="pt") >>> outputs = model(**inputs) >>> last_hidden_states = outputs.last_hidden_state >>> list(last_hidden_states.shape) [1, 300, 256] ```""" output_attentions = output_attentions if output_attentions is not None else self.config.output_attentions output_hidden_states = ( output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states ) return_dict = return_dict if return_dict is not None else self.config.use_return_dict batch_size, num_channels, height, width = pixel_values.shape device = pixel_values.device if pixel_mask is None: pixel_mask = torch.ones(((batch_size, height, width)), device=device) features = self.backbone(pixel_values, pixel_mask) proj_feats = [self.encoder_input_proj[level](source) for level, (source, mask) in enumerate(features)] if encoder_outputs is None: encoder_outputs = self.encoder( proj_feats, output_attentions=output_attentions, output_hidden_states=output_hidden_states, return_dict=return_dict, ) # If the user passed a tuple for encoder_outputs, we wrap it in a BaseModelOutput when return_dict=True elif return_dict and not isinstance(encoder_outputs, BaseModelOutput): encoder_outputs = BaseModelOutput( last_hidden_state=encoder_outputs[0], hidden_states=encoder_outputs[1] if output_hidden_states else None, attentions=encoder_outputs[2] if len(encoder_outputs) > 2 else encoder_outputs[1] if output_attentions else None, ) # Equivalent to def _get_encoder_input # https://github.com/lyuwenyu/RT-DETR/blob/94f5e16708329d2f2716426868ec89aa774af016/rtdetr_pytorch/src/zoo/rtdetr/rtdetr_decoder.py#L412 sources = [] for level, source in enumerate(encoder_outputs[0]): sources.append(self.decoder_input_proj[level](source)) # Lowest resolution feature maps are obtained via 3x3 stride 2 convolutions on the final stage if self.config.num_feature_levels > len(sources): _len_sources = len(sources) sources.append(self.decoder_input_proj[_len_sources](encoder_outputs[0])[-1]) for i in range(_len_sources + 1, self.config.num_feature_levels): sources.append(self.decoder_input_proj[i](encoder_outputs[0][-1])) # Prepare encoder inputs (by flattening) source_flatten = [] spatial_shapes_list = [] spatial_shapes = torch.empty((len(sources), 2), device=device, dtype=torch.long) for level, source in enumerate(sources): height, width = source.shape[-2:] spatial_shapes[level, 0] = height spatial_shapes[level, 1] = width spatial_shapes_list.append((height, width)) source = source.flatten(2).transpose(1, 2) source_flatten.append(source) source_flatten = torch.cat(source_flatten, 1) level_start_index = torch.cat((spatial_shapes.new_zeros((1,)), spatial_shapes.prod(1).cumsum(0)[:-1])) # prepare denoising training if self.training and self.config.num_denoising > 0 and labels is not None: ( denoising_class, denoising_bbox_unact, attention_mask, denoising_meta_values, ) = get_contrastive_denoising_training_group( targets=labels, num_classes=self.config.num_labels, num_queries=self.config.num_queries, class_embed=self.denoising_class_embed, num_denoising_queries=self.config.num_denoising, label_noise_ratio=self.config.label_noise_ratio, box_noise_scale=self.config.box_noise_scale, ) else: denoising_class, denoising_bbox_unact, attention_mask, denoising_meta_values = None, None, None, None batch_size = len(source_flatten) device = source_flatten.device dtype = source_flatten.dtype # prepare input for decoder if self.training or self.config.anchor_image_size is None: # Pass spatial_shapes as tuple to make it hashable and make sure # lru_cache is working for generate_anchors() spatial_shapes_tuple = tuple(spatial_shapes_list) anchors, valid_mask = self.generate_anchors(spatial_shapes_tuple, device=device, dtype=dtype) else: anchors, valid_mask = self.anchors, self.valid_mask anchors, valid_mask = anchors.to(device, dtype), valid_mask.to(device, dtype) # use the valid_mask to selectively retain values in the feature map where the mask is `True` memory = valid_mask.to(source_flatten.dtype) * source_flatten output_memory = self.enc_output(memory) enc_outputs_class = self.enc_score_head(output_memory) enc_outputs_coord_logits = self.enc_bbox_head(output_memory) + anchors _, topk_ind = torch.topk(enc_outputs_class.max(-1).values, self.config.num_queries, dim=1) reference_points_unact = enc_outputs_coord_logits.gather( dim=1, index=topk_ind.unsqueeze(-1).repeat(1, 1, enc_outputs_coord_logits.shape[-1]) ) enc_topk_bboxes = F.sigmoid(reference_points_unact) if denoising_bbox_unact is not None: reference_points_unact = torch.concat([denoising_bbox_unact, reference_points_unact], 1) enc_topk_logits = enc_outputs_class.gather( dim=1, index=topk_ind.unsqueeze(-1).repeat(1, 1, enc_outputs_class.shape[-1]) ) # extract region features if self.config.learn_initial_query: target = self.weight_embedding.tile([batch_size, 1, 1]) else: target = output_memory.gather(dim=1, index=topk_ind.unsqueeze(-1).repeat(1, 1, output_memory.shape[-1])) target = target.detach() if denoising_class is not None: target = torch.concat([denoising_class, target], 1) init_reference_points = reference_points_unact.detach() # decoder decoder_outputs = self.decoder( inputs_embeds=target, encoder_hidden_states=source_flatten, encoder_attention_mask=attention_mask, reference_points=init_reference_points, spatial_shapes=spatial_shapes, spatial_shapes_list=spatial_shapes_list, level_start_index=level_start_index, output_attentions=output_attentions, output_hidden_states=output_hidden_states, return_dict=return_dict, ) if not return_dict: enc_outputs = tuple( value for value in [enc_topk_logits, enc_topk_bboxes, enc_outputs_class, enc_outputs_coord_logits] if value is not None ) dn_outputs = tuple(value if value is not None else None for value in [denoising_meta_values]) tuple_outputs = decoder_outputs + encoder_outputs + (init_reference_points,) + enc_outputs + dn_outputs return tuple_outputs return RTDetrModelOutput( last_hidden_state=decoder_outputs.last_hidden_state, intermediate_hidden_states=decoder_outputs.intermediate_hidden_states, intermediate_logits=decoder_outputs.intermediate_logits, intermediate_reference_points=decoder_outputs.intermediate_reference_points, intermediate_predicted_corners=decoder_outputs.intermediate_predicted_corners, initial_reference_points=decoder_outputs.initial_reference_points, decoder_hidden_states=decoder_outputs.hidden_states, decoder_attentions=decoder_outputs.attentions, cross_attentions=decoder_outputs.cross_attentions, encoder_last_hidden_state=encoder_outputs.last_hidden_state, encoder_hidden_states=encoder_outputs.hidden_states, encoder_attentions=encoder_outputs.attentions, init_reference_points=init_reference_points, enc_topk_logits=enc_topk_logits, enc_topk_bboxes=enc_topk_bboxes, enc_outputs_class=enc_outputs_class, enc_outputs_coord_logits=enc_outputs_coord_logits, denoising_meta_values=denoising_meta_values, ) @auto_docstring( custom_intro=""" RT-DETR Model (consisting of a backbone and encoder-decoder) outputting bounding boxes and logits to be further decoded into scores and classes. """ ) class RTDetrForObjectDetection(RTDetrPreTrainedModel): # When using clones, all layers > 0 will be clones, but layer 0 *is* required _tied_weights_keys = ["bbox_embed", "class_embed"] # We can't initialize the model on meta device as some weights are modified during the initialization _no_split_modules = None def __init__(self, config: RTDetrConfig): super().__init__(config) # RTDETR encoder-decoder model self.model = RTDetrModel(config) # Detection heads on top self.class_embed = partial(nn.Linear, config.d_model, config.num_labels) self.bbox_embed = partial(RTDetrMLPPredictionHead, config, config.d_model, config.d_model, 4, num_layers=3) # if two-stage, the last class_embed and bbox_embed is for region proposal generation num_pred = config.decoder_layers if config.with_box_refine: self.class_embed = _get_clones(self.class_embed, num_pred) self.bbox_embed = _get_clones(self.bbox_embed, num_pred) else: self.class_embed = nn.ModuleList([self.class_embed() for _ in range(num_pred)]) self.bbox_embed = nn.ModuleList([self.bbox_embed() for _ in range(num_pred)]) # hack implementation for iterative bounding box refinement self.model.decoder.class_embed = self.class_embed self.model.decoder.bbox_embed = self.bbox_embed # Initialize weights and apply final processing self.post_init() @torch.jit.unused def _set_aux_loss(self, outputs_class, outputs_coord): # this is a workaround to make torchscript happy, as torchscript # doesn't support dictionary with non-homogeneous values, such # as a dict having both a Tensor and a list. return [{"logits": a, "pred_boxes": b} for a, b in zip(outputs_class, outputs_coord)] @auto_docstring def forward( self, pixel_values: torch.FloatTensor, pixel_mask: Optional[torch.LongTensor] = None, encoder_outputs: Optional[torch.FloatTensor] = None, inputs_embeds: Optional[torch.FloatTensor] = None, decoder_inputs_embeds: Optional[torch.FloatTensor] = None, labels: Optional[list[dict]] = None, output_attentions: Optional[bool] = None, output_hidden_states: Optional[bool] = None, return_dict: Optional[bool] = None, **kwargs, ) -> Union[tuple[torch.FloatTensor], RTDetrObjectDetectionOutput]: r""" inputs_embeds (`torch.FloatTensor` of shape `(batch_size, sequence_length, hidden_size)`, *optional*): Optionally, instead of passing the flattened feature map (output of the backbone + projection layer), you can choose to directly pass a flattened representation of an image. decoder_inputs_embeds (`torch.FloatTensor` of shape `(batch_size, num_queries, hidden_size)`, *optional*): Optionally, instead of initializing the queries with a tensor of zeros, you can choose to directly pass an embedded representation. labels (`list[Dict]` of len `(batch_size,)`, *optional*): Labels for computing the bipartite matching loss. List of dicts, each dictionary containing at least the following 2 keys: 'class_labels' and 'boxes' (the class labels and bounding boxes of an image in the batch respectively). The class labels themselves should be a `torch.LongTensor` of len `(number of bounding boxes in the image,)` and the boxes a `torch.FloatTensor` of shape `(number of bounding boxes in the image, 4)`. Examples: ```python >>> from transformers import RTDetrImageProcessor, RTDetrForObjectDetection >>> from PIL import Image >>> import requests >>> import torch >>> url = "http://images.cocodataset.org/val2017/000000039769.jpg" >>> image = Image.open(requests.get(url, stream=True).raw) >>> image_processor = RTDetrImageProcessor.from_pretrained("PekingU/rtdetr_r50vd") >>> model = RTDetrForObjectDetection.from_pretrained("PekingU/rtdetr_r50vd") >>> # prepare image for the model >>> inputs = image_processor(images=image, return_tensors="pt") >>> # forward pass >>> outputs = model(**inputs) >>> logits = outputs.logits >>> list(logits.shape) [1, 300, 80] >>> boxes = outputs.pred_boxes >>> list(boxes.shape) [1, 300, 4] >>> # convert outputs (bounding boxes and class logits) to Pascal VOC format (xmin, ymin, xmax, ymax) >>> target_sizes = torch.tensor([image.size[::-1]]) >>> results = image_processor.post_process_object_detection(outputs, threshold=0.9, target_sizes=target_sizes)[ ... 0 ... ] >>> for score, label, box in zip(results["scores"], results["labels"], results["boxes"]): ... box = [round(i, 2) for i in box.tolist()] ... print( ... f"Detected {model.config.id2label[label.item()]} with confidence " ... f"{round(score.item(), 3)} at location {box}" ... ) Detected sofa with confidence 0.97 at location [0.14, 0.38, 640.13, 476.21] Detected cat with confidence 0.96 at location [343.38, 24.28, 640.14, 371.5] Detected cat with confidence 0.958 at location [13.23, 54.18, 318.98, 472.22] Detected remote with confidence 0.951 at location [40.11, 73.44, 175.96, 118.48] Detected remote with confidence 0.924 at location [333.73, 76.58, 369.97, 186.99] ```""" output_attentions = output_attentions if output_attentions is not None else self.config.output_attentions output_hidden_states = ( output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states ) return_dict = return_dict if return_dict is not None else self.config.use_return_dict outputs = self.model( pixel_values, pixel_mask=pixel_mask, encoder_outputs=encoder_outputs, inputs_embeds=inputs_embeds, decoder_inputs_embeds=decoder_inputs_embeds, labels=labels, output_attentions=output_attentions, output_hidden_states=output_hidden_states, return_dict=return_dict, ) denoising_meta_values = ( outputs.denoising_meta_values if return_dict else outputs[-1] if self.training else None ) outputs_class = outputs.intermediate_logits if return_dict else outputs[2] outputs_coord = outputs.intermediate_reference_points if return_dict else outputs[3] predicted_corners = outputs.intermediate_predicted_corners if return_dict else outputs[4] initial_reference_points = outputs.initial_reference_points if return_dict else outputs[5] logits = outputs_class[:, -1] pred_boxes = outputs_coord[:, -1] loss, loss_dict, auxiliary_outputs, enc_topk_logits, enc_topk_bboxes = None, None, None, None, None if labels is not None: enc_topk_logits = outputs.enc_topk_logits if return_dict else outputs[-5] enc_topk_bboxes = outputs.enc_topk_bboxes if return_dict else outputs[-4] loss, loss_dict, auxiliary_outputs = self.loss_function( logits, labels, self.device, pred_boxes, self.config, outputs_class, outputs_coord, enc_topk_logits=enc_topk_logits, enc_topk_bboxes=enc_topk_bboxes, denoising_meta_values=denoising_meta_values, predicted_corners=predicted_corners, initial_reference_points=initial_reference_points, **kwargs, ) if not return_dict: if auxiliary_outputs is not None: output = (logits, pred_boxes) + (auxiliary_outputs,) + outputs else: output = (logits, pred_boxes) + outputs return ((loss, loss_dict) + output) if loss is not None else output return RTDetrObjectDetectionOutput( loss=loss, loss_dict=loss_dict, logits=logits, pred_boxes=pred_boxes, auxiliary_outputs=auxiliary_outputs, last_hidden_state=outputs.last_hidden_state, intermediate_hidden_states=outputs.intermediate_hidden_states, intermediate_logits=outputs.intermediate_logits, intermediate_reference_points=outputs.intermediate_reference_points, intermediate_predicted_corners=outputs.intermediate_predicted_corners, initial_reference_points=outputs.initial_reference_points, decoder_hidden_states=outputs.decoder_hidden_states, decoder_attentions=outputs.decoder_attentions, cross_attentions=outputs.cross_attentions, encoder_last_hidden_state=outputs.encoder_last_hidden_state, encoder_hidden_states=outputs.encoder_hidden_states, encoder_attentions=outputs.encoder_attentions, init_reference_points=outputs.init_reference_points, enc_topk_logits=outputs.enc_topk_logits, enc_topk_bboxes=outputs.enc_topk_bboxes, enc_outputs_class=outputs.enc_outputs_class, enc_outputs_coord_logits=outputs.enc_outputs_coord_logits, denoising_meta_values=outputs.denoising_meta_values, ) __all__ = [ "RTDetrForObjectDetection", "RTDetrModel", "RTDetrPreTrainedModel", ]