L iddlZddlZddlZddlZddlZddlZddlZddlZddlZddl m Z m Z ddl m Z ddlmZmZmZddlmZmZddlmZmZmZmZddlZddlmZddlmZddl m!Z!m"Z"dd l#m$Z$m%Z%m&Z&dd l'm(Z(dd l)m*Z*m+Z+dd l,m-Z-m.Z.d Z/ej`rddl1m2Z2m3Z3m4Z4ddl5m6Z6m7Z7m8Z8m9Z9m:Z:m;Z;ejxjardZ/ddl=m>Z>erddl?m@Z@dgZAejeCZDeGddZEdZFdZGdZHdZIdZJdZKGddeZLeGddZMGd d!e!ZNGd"d#e&ZOGd$de(e%ZPy)%N) defaultdictdeque)contextmanager) dataclassfields is_dataclass)autoEnum)AnyCallableOptional TYPE_CHECKING)_get_device_index)FunctionVariable)JoinJoinableJoinHook)Module)gatherscatter_kwargs) tree_flattentree_unflattenF)_get_default_group_rank_not_in_groupReduceOp)_alloc_storage_cast_forward_inputs _free_storage_sync_module_states _to_kwargs$_verify_param_shape_across_processesT)RRef)RemovableHandleDistributedDataParallelceZdZUdZdZeejed<dZ eejed<dZ eejed<y)_MixedPrecisiona This configures DDP-native mixed precision training. Attributes: param_dtype (torch.dtype): This specifies the dtype for model parameters, inputs (when ``cast_forward_inputs`` is set to ``True``), and therefore the dtype for computation. However, outside the forward and backward passes, parameters are in full precision. Model checkpointing always happens in full precision. reduce_dtype (torch.dtype): This specifies the dtype for gradient reduction, which is permitted to differ from ``param_dtype``. buffer_dtype (torch.dtype): This specifies the dtype for buffers. .. note:: This API is experimental and subject to change. .. note:: Only floating point tensors are cast to their specified dtypes. .. note:: ``state_dict`` checkpoints parameters and buffers in full precision. .. note:: Each low precision dtype must be specified explicitly. For example, ``_MixedPrecision(reduce_dtype=torch.float16)`` only specifies the reduction dtype to be low precision, and DDP will not cast parameters or buffers. .. note:: If a ``reduce_dtype`` is not specified, then gradient reduction happens in ``param_dtype`` if specified or the original parameter dtype otherwise. For example, ``_MixedPrecision(param_dtype=torch.float16)`` would result in communication occurring in fp16. N param_dtype reduce_dtype buffer_dtype) __name__ __module__ __qualname____doc__r(r torchdtype__annotations__r)r*c/mnt/ssd/data/python-lab/Trading/venv/lib/python3.12/site-packages/torch/nn/parallel/distributed.pyr'r'7sC@*.K%++&-*.L(5;;'.*.L(5;;'.r3r'c|jD]<}t|dr |jr|j|j|_>y)z,Casts buffers to the given ``buffer_dtype``. _ddp_ignored)r0N)buffershasattrr6tor*data)mixed_precision_config root_modulebufs r4 _cast_buffersr>bsJ""$E 3 'C,<,< 66 6 C C6D Er3c>|jD]}t|dr |jrt|dr)tj||j |j |j|_t|j|j|_ y)z;Create and free storage for the mixed precision parameters.r6 _mp_param)devicer0 requires_gradN) parametersr8r6r/ zeros_likerAr(rBr@rr: _fp_param)r;r<params r4_setup_mixed_precision_paramsrGks'')) 5. )e.@.@ uk*#..||,88#11 EO %// *$jjEO)r3ctxrt|t}|rt|j \}}nt|\}}|||fSN) RPC_AVAILABLE isinstancer#r local_value)outputoutput_is_rrefoutput_tensor_listtreespecs r4_tree_flatten_with_rrefrQsN"?z&$'?N'3F4F4F4H'I$H'3F';$H x 77r3c8t||}|r t|}|SrI)rr#)rMrPrNs r4_tree_unflatten_with_rrefrSs FH -Ff Mr3c ptr9ttr)jrt j Stt jrgStttfr-tjjttSttr;tjjttjSt!r?tjjttfdt#DSgS)z?Recursively find all tensors contained in the specified object.c3JK|]}t|jywrI)getattrname).0fobjs r4 z _find_tensors..sJQVV 4Js #)rJrKr#is_owner _find_tensorsrLr/Tensorlisttuple itertoolschain from_iterablemapdictvaluesrr)rZs`r4r]r]sC. <<> !23 3#u||$u #e}%,,S-DEE#t,,S -MNNC,,  JfSkJ K   Ir3cgd}d}|D]5}|tjvrtj|nd}|d|d|dz }7t|y)N)/RANK LOCAL_RANK WORLD_SIZE MASTER_PORT MASTER_ADDRCUDA_VISIBLE_DEVICESGLOO_SOCKET_IFNAMEGLOO_DEVICE_TRANSPORTNCCL_SOCKET_IFNAMETORCH_NCCL_BLOCKING_WAIT NCCL_DEBUGNCCL_DEBUG_SUBSYSNCCL_IB_DISABLENCCL_P2P_DISABLENCCL_P2P_LEVELNCCL_SHM_DISABLENCCL_SOCKET_NTHREADSNCCL_NSOCKS_PERTHREAD NCCL_BUFFSIZE NCCL_NTHREADS NCCL_RINGSNCCL_MAX_NCHANNELSNCCL_MIN_NCHANNELSNCCL_CHECKS_DISABLENCCL_CHECK_POINTERSNCCL_LAUNCH_MODE NCCL_IB_HCANCCL_IB_TIMEOUTNCCL_IB_RETRY_CNTNCCL_IB_GID_INDEX NCCL_IB_SL NCCL_IB_TCNCCL_IB_AR_THRESHOLDNCCL_IB_CUDA_SUPPORTNCCL_NET_GDR_LEVELNCCL_NET_GDR_READNCCL_SINGLE_RING_THRESHOLDNCCL_LL_THRESHOLDNCCL_TREE_THRESHOLD NCCL_ALGO NCCL_PROTONCCL_IGNORE_CPU_AFFINITYNCCL_DEBUG_FILENCCL_COLLNET_ENABLENCCL_TOPO_FILENCCL_TOPO_DUMP_FILETORCH_NCCL_ASYNC_ERROR_HANDLINGzN/Azenv:= )osenvironprint)relevant_env_varsformatted_outputvarvalues r4_dump_DDP_relevant_env_varsrsa1d 3#&"**#4 3%d3%qr223 r3c(eZdZeZeZy)_BufferCommHookLocationN)r+r,r-r PRE_FORWARD POST_FORWARDr2r3r4rrs&K6Lr3rc,eZdZUeed<eed<eed<y)_BufferCommHookbuffer_comm_hookbuffer_comm_hook_statebuffer_comm_hook_locationN)r+r,r-r r1r rr2r3r4rrs66r3rc,eZdZedZedZy)_DDPSinkc|jd||_|}|jrtd|D}|S)NFc3tK|]0}t|tjr|jn|2ywrI)rKr/r^clone)rXinps r4r[z#_DDPSink.forward..s-JMz#u||< #Es68)set_materialize_grads ddp_weakref_ddp_sink_cloner`)ctxrinputsrets r4forwardz_DDPSink.forwardsG !!%(% = ( (QWC r3c|j}|j}|j}|xr |j}|r2|s0tj j |jd|_dg|SNT)rreducer static_graph&_static_graph_delay_allreduce_enqueuedr_execution_enginequeue_callback_delay_all_reduce)r grad_outputsrrrdelay_ar_enqueueds r4backwardz_DDPSink.backwardsxoo' %%"//  O[OO   1  & & 5 5)) BFK >$|$$r3N)r+r,r- staticmethodrrr2r3r4rrs(  %%r3rc0eZdZfdZdZdefdZxZS) _DDPJoinHookct|tsJd|jJ|jj||_||j_t |y)z(Set config variables for internal usage.zQDDP join hook requires passing in a DistributedDataParallel instance as the stateN)rKr%logger_set_uneven_input_joinddp_divide_by_initial_world_sizesuper__init__)selfrdivide_by_initial_world_size __class__s r4rz_DDPJoinHook.__init__s`#67  $ 7zz%%% ))+1M. r3c4|j}|jj|j|j d}||_|sy|j |jr|j|jjy)zVShadow the DDP collective communication operations in the forward and backward passes.Tis_joined_rankN) rr_rebuild_buckets_check_and_sync_module_buffers)_check_global_requires_backward_grad_syncrequire_forward_param_sync_match_all_reduce_for_bwd_passfind_unused_parameters_match_unused_params_allreduce_push_all_rebuilt_params)rrshould_sync_backwardss r4 main_hookz_DDPJoinHook.main_hookshh $$& **,!$ M M!N!  *?&$  **,  % %  . . 0 ,,.r3is_last_joinerc:|jj|y)zOSync the final model to ensure that the model is the same across all processes.N)r_sync_final_modelrrs r4 post_hookz_DDPJoinHook.post_hookAs "">2r3)r+r,r-rrboolr __classcell__rs@r4rrs  /D33r3rceZdZUdZdZeded< dRdeeffd ZdZ dZ d Z d Z d Z d ed eddfdZd ed eddfdZdZdZdZfdZdZdZdZdSdZdZedZedZeej>ddZ dZ!dZ"d Z#d!Z$d"Z%d#Z&d$Z'd%Z(d&Z)dSfd' Z*d(Z+d)Z,d*Z-d+Z.d,Z/ dTd-e0d.e0d/e0fd0Z1d1Z2e3d2Z4e3d3Z5e6jnfd4e8fd5Z9d6e:d4e8fd7Z;d8Z dUd<Z?d=Z@d>ZAd?ZBd@ZCdAZDdBZE dVdCZFdDZGdEZHe3dFZIeJdWdGZKeJdHZLdIZMdJZNdKZOdLZPdMZQdNZRdOZSdPe0fdQZTxZUS)Xr%aFImplement distributed data parallelism based on ``torch.distributed`` at module level. This container provides data parallelism by synchronizing gradients across each model replica. The devices to synchronize across are specified by the input ``process_group``, which is the entire world by default. Note that ``DistributedDataParallel`` does not chunk or otherwise shard the input across participating GPUs; the user is responsible for defining how to do so, for example through the use of a :class:`DistributedSampler`. See also: :ref:`distributed-basics` and :ref:`cuda-nn-ddp-instead`. The same constraints on input as in :class:`torch.nn.DataParallel` apply. Creation of this class requires that ``torch.distributed`` to be already initialized, by calling :func:`torch.distributed.init_process_group`. ``DistributedDataParallel`` is proven to be significantly faster than :class:`torch.nn.DataParallel` for single-node multi-GPU data parallel training. To use ``DistributedDataParallel`` on a host with N GPUs, you should spawn up ``N`` processes, ensuring that each process exclusively works on a single GPU from 0 to N-1. This can be done by either setting ``CUDA_VISIBLE_DEVICES`` for every process or by calling the following API for GPUs, >>> # xdoctest: +SKIP("undefined variables") >>> torch.cuda.set_device(i) or calling the unified API for :ref:`accelerator`, >>> # xdoctest: +SKIP("undefined variables") >>> torch.accelerator.set_device_index(i) where i is from 0 to N-1. In each process, you should refer the following to construct this module: >>> # xdoctest: +SKIP("undefined variables") >>> if torch.accelerator.is_available(): >>> device_type = torch.accelerator.current_accelerator().type >>> vendor_backend = torch.distributed.get_default_backend_for_device(device_type) >>> >>> torch.distributed.init_process_group( >>> backend=vendor_backend, world_size=N, init_method='...' >>> ) >>> model = DistributedDataParallel(model, device_ids=[i], output_device=i) Or you can use the latest API for initialization: >>> torch.distributed.init_process_group(device_id=i) In order to spawn up multiple processes per node, you can use either ``torch.distributed.launch`` or ``torch.multiprocessing.spawn``. .. note:: Please refer to `PyTorch Distributed Overview `__ for a brief introduction to all features related to distributed training. .. note:: ``DistributedDataParallel`` can be used in conjunction with :class:`torch.distributed.optim.ZeroRedundancyOptimizer` to reduce per-rank optimizer states memory footprint. Please refer to `ZeroRedundancyOptimizer recipe `__ for more details. .. note:: ``nccl`` backend is currently the fastest and highly recommended backend when using GPUs. This applies to both single-node and multi-node distributed training. .. note:: This module also supports mixed-precision distributed training. This means that your model can have different types of parameters such as mixed types of ``fp16`` and ``fp32``, the gradient reduction on these mixed types of parameters will just work fine. .. note:: If you use ``torch.save`` on one process to checkpoint the module, and ``torch.load`` on some other processes to recover it, make sure that ``map_location`` is configured properly for every process. Without ``map_location``, ``torch.load`` would recover the module to devices where the module was saved from. .. note:: When a model is trained on ``M`` nodes with ``batch=N``, the gradient will be ``M`` times smaller when compared to the same model trained on a single node with ``batch=M*N`` if the loss is summed (NOT averaged as usual) across instances in a batch (because the gradients between different nodes are averaged). You should take this into consideration when you want to obtain a mathematically equivalent training process compared to the local training counterpart. But in most cases, you can just treat a DistributedDataParallel wrapped model, a DataParallel wrapped model and an ordinary model on a single GPU as the same (E.g. using the same learning rate for equivalent batch size). .. note:: Parameters are never broadcast between processes. The module performs an all-reduce step on gradients and assumes that they will be modified by the optimizer in all processes in the same way. Buffers (e.g. BatchNorm stats) are broadcast from the module in process of rank 0, to all other replicas in the system in every iteration. .. note:: If you are using DistributedDataParallel in conjunction with the :ref:`distributed-rpc-framework`, you should always use :meth:`torch.distributed.autograd.backward` to compute gradients and :class:`torch.distributed.optim.DistributedOptimizer` for optimizing parameters. Example:: >>> # xdoctest: +SKIP("undefined variables") >>> import torch.distributed.autograd as dist_autograd >>> from torch.nn.parallel import DistributedDataParallel as DDP >>> import torch >>> from torch import optim >>> from torch.distributed.optim import DistributedOptimizer >>> import torch.distributed.rpc as rpc >>> from torch.distributed.rpc import RRef >>> >>> t1 = torch.rand((3, 3), requires_grad=True) >>> t2 = torch.rand((3, 3), requires_grad=True) >>> rref = rpc.remote("worker1", torch.add, args=(t1, t2)) >>> ddp_model = DDP(my_model) >>> >>> # Setup optimizer >>> optimizer_params = [rref] >>> for param in ddp_model.parameters(): >>> optimizer_params.append(RRef(param)) >>> >>> dist_optim = DistributedOptimizer( >>> optim.SGD, >>> optimizer_params, >>> lr=0.05, >>> ) >>> >>> with dist_autograd.context() as context_id: >>> pred = ddp_model(rref.to_here()) >>> loss = loss_func(pred, target) >>> dist_autograd.backward(context_id, [loss]) >>> dist_optim.step(context_id) .. note:: DistributedDataParallel currently offers limited support for gradient checkpointing with :meth:`torch.utils.checkpoint`. If the checkpoint is done with use_reentrant=False (recommended), DDP will work as expected without any limitations. If, however, the checkpoint is done with use_reentrant=True (the default), DDP will work as expected when there are no unused parameters in the model and each layer is checkpointed at most once (make sure you are not passing `find_unused_parameters=True` to DDP). We currently do not support the case where a layer is checkpointed multiple times, or when there unused parameters in the checkpointed model. .. note:: To let a non-DDP model load a state dict from a DDP model, :meth:`~torch.nn.modules.utils.consume_prefix_in_state_dict_if_present` needs to be applied to strip the prefix "module." in the DDP state dict before loading. .. warning:: Constructor, forward method, and differentiation of the output (or a function of the output of this module) are distributed synchronization points. Take that into account in case different processes might be executing different code. .. warning:: This module assumes all parameters are registered in the model by the time it is created. No parameters should be added nor removed later. Same applies to buffers. .. warning:: This module assumes all parameters are registered in the model of each distributed processes are in the same order. The module itself will conduct gradient ``allreduce`` following the reverse order of the registered parameters of the model. In other words, it is users' responsibility to ensure that each distributed process has the exact same model and thus the exact same parameter registration order. .. warning:: This module allows parameters with non-rowmajor-contiguous strides. For example, your model may contain some parameters whose :class:`torch.memory_format` is ``torch.contiguous_format`` and others whose format is ``torch.channels_last``. However, corresponding parameters in different processes must have the same strides. .. warning:: This module doesn't work with :func:`torch.autograd.grad` (i.e. it will only work if gradients are to be accumulated in ``.grad`` attributes of parameters). .. warning:: If you plan on using this module with a ``nccl`` backend or a ``gloo`` backend (that uses Infiniband), together with a DataLoader that uses multiple workers, please change the multiprocessing start method to ``forkserver`` (Python 3 only) or ``spawn``. Unfortunately Gloo (that uses Infiniband) and NCCL2 are not fork safe, and you will likely experience deadlocks if you don't change this setting. .. warning:: You should never try to change your model's parameters after wrapping up your model with ``DistributedDataParallel``. Because, when wrapping up your model with ``DistributedDataParallel``, the constructor of ``DistributedDataParallel`` will register the additional gradient reduction functions on all the parameters of the model itself at the time of construction. If you change the model's parameters afterwards, gradient reduction functions no longer match the correct set of parameters. .. warning:: Using ``DistributedDataParallel`` in conjunction with the :ref:`distributed-rpc-framework` is experimental and subject to change. Args: module (Module): module to be parallelized device_ids (list of int or torch.device): CUDA devices. 1) For single-device modules, ``device_ids`` can contain exactly one device id, which represents the only CUDA device where the input module corresponding to this process resides. Alternatively, ``device_ids`` can also be ``None``. 2) For multi-device modules and CPU modules, ``device_ids`` must be ``None``. When ``device_ids`` is ``None`` for both cases, both the input data for the forward pass and the actual module must be placed on the correct device. (default: ``None``) output_device (int or torch.device): Device location of output for single-device CUDA modules. For multi-device modules and CPU modules, it must be ``None``, and the module itself dictates the output location. (default: ``device_ids[0]`` for single-device modules) broadcast_buffers (bool): Flag that enables syncing (broadcasting) buffers of the module at beginning of the ``forward`` function. (default: ``True``) init_sync (bool): Whether to sync during initialization to verify param shapes and broadcast parameters and buffers. WARNING: if this is set to False the user is required to ensure themselves that the weights are the same on all ranks. (default: ``True``) process_group: The process group to be used for distributed data all-reduction. If ``None``, the default process group, which is created by :func:`torch.distributed.init_process_group`, will be used. (default: ``None``) bucket_cap_mb: ``DistributedDataParallel`` will bucket parameters into multiple buckets so that gradient reduction of each bucket can potentially overlap with backward computation. :attr:`bucket_cap_mb` controls the bucket size in MebiBytes (MiB). If ``None``, a default size of 25 MiB will be used. (default: ``None``) find_unused_parameters (bool): Traverse the autograd graph from all tensors contained in the return value of the wrapped module's ``forward`` function. Parameters that don't receive gradients as part of this graph are preemptively marked as being ready to be reduced. In addition, parameters that may have been used in the wrapped module's ``forward`` function but were not part of loss computation and thus would also not receive gradients are preemptively marked as ready to be reduced. (default: ``False``) check_reduction: This argument is deprecated. gradient_as_bucket_view (bool): When set to ``True``, gradients will be views pointing to different offsets of ``allreduce`` communication buckets. This can reduce peak memory usage, where the saved memory size will be equal to the total gradients size. Moreover, it avoids the overhead of copying between gradients and ``allreduce`` communication buckets. When gradients are views, ``detach_()`` cannot be called on the gradients. If hitting such errors, please fix it by referring to the :meth:`~torch.optim.Optimizer.zero_grad` function in ``torch/optim/optimizer.py`` as a solution. Note that gradients will be views after first iteration, so the peak memory saving should be checked after first iteration. static_graph (bool): When set to ``True``, DDP knows the trained graph is static. Static graph means 1) The set of used and unused parameters will not change during the whole training loop; in this case, it does not matter whether users set ``find_unused_parameters = True`` or not. 2) How the graph is trained will not change during the whole training loop (meaning there is no control flow depending on iterations). When static_graph is set to be ``True``, DDP will support cases that can not be supported in the past: 1) Reentrant backwards. 2) Activation checkpointing multiple times. 3) Activation checkpointing when model has unused parameters. 4) There are model parameters that are outside of forward function. 5) Potentially improve performance when there are unused parameters, as DDP will not search graph in each iteration to detect unused parameters when static_graph is set to be ``True``. To check whether you can set static_graph to be ``True``, one way is to check ddp logging data at the end of your previous model training, if ``ddp_logging_data.get("can_set_static_graph") == True``, mostly you can set ``static_graph = True`` as well. Example:: >>> # xdoctest: +SKIP("undefined variables") >>> model_DDP = torch.nn.parallel.DistributedDataParallel(model) >>> # Training loop >>> ... >>> ddp_logging_data = model_DDP._get_ddp_logging_data() >>> static_graph = ddp_logging_data.get("can_set_static_graph") delay_all_reduce_named_params (list of tuple of str and torch.nn.Parameter): a list of named parameters whose all reduce will be delayed when the gradient of the parameter specified in ``param_to_hook_all_reduce`` is ready. Other arguments of DDP do not apply to named params specified in this argument as these named params will be ignored by DDP reducer. param_to_hook_all_reduce (torch.nn.Parameter): a parameter to hook delayed all reduce of parameters specified in ``delay_all_reduce_named_params``. skip_all_reduce_unused_params: When set to True, DDP will skip reducing unused parameters. This requires that unused parameters remain the same across all ranks throughout the entire training process. If this condition is not met, it may cause desynchronization and result in training hang. Attributes: module (Module): the module to be parallelized. Example:: >>> # xdoctest: +SKIP("undefined variables") >>> torch.distributed.init_process_group(backend='nccl', world_size=4, init_method='...') >>> net = torch.nn.parallel.DistributedDataParallel(model) N_active_ddp_moduleFmixed_precisionc t!|tj|tjj j dk(|_d|_t| dut|duk7r|jtd|r | td||t|_np|||_nf|jdk7rtd|d||_|j#d|_dd lm}|j)|}||k7rdd lm}||g|_t1|d rt3|j4|_nt3|_| @| D];\}}|j6j9||j.j;|=|j=Dcgc]\}}||j6vr|c}}|_tAd |j>DsAtC|j.rtjEd n|jtd|$tC|dkDr|jtdtC|j>Dchc]}|jFc}dkD|_$|j>Dchc]%}|jF|jFjJ'}}tC|dk7r|jtd|dtMtO||_(|)tC|dk(s|jPdk(s |jHrU|s|rB|jtd|d|d|j>Dchc]}|jFc}dd|_)d|_*n7|Dcgc]}tW|dc}|_)||d}tW|d|_*d|_,||_-||_.tMtO|j>jF|_#||_/| |_0d|_1d|_2| |_3||_4|jh tjkd|jh| rtmjndtpd|j>D]G}ts|tjtjvjxs2|jtdIt{d|_>| d}d|_?nd|_?t{|dzdz|_@tjjdd d k(|_Dd|_Eg|_Fd|_GtC|j.dk7r!|j|||!|jry||_I|j\}}|rYt|j|t|j\|j|j|d|j6|j^"|j|}|j|||| g|_O|jhJt|jh|j\t|jh|j\tj|_Stt|_V|j\j|jdd#|j\jD] }|j|jdd#"dd$l[m\}m]}|tj|tj%} |j| ||jj|jhjd|_d| r|jd|_fg|_g|jrdtjj_j|tjj_ktjjjj9d&tjjjj|jd|_qycc}}wcc}wcc}wcc}wcc}w)'Npython_reducerz[delay_all_reduce_named_params and param_to_hook_all_reduce need to be set at the same time.z.sDq1??Dz&Delay the AllReduce of all parameters.zhDistributedDataParallel is not needed when a module doesn't have any parameter that requires a gradient.z8device_ids can only be None or contain a single element.zrDistributedDataParallel's input module must be on the same type of devices, but input module parameters locate in cpuzDistributedDataParallel device_ids and output_device arguments only work with single-device/multiple-device GPU modules or CPU modules, but got device_ids z, output_device z, and module parameters TFz"Received mixed precision config %szhThe `check_reduction` argument in `DistributedDataParallel` module is deprecated. Please avoid using it.) stacklevelzModules with uninitialized parameters can't be used with `DistributedDataParallel`. Run a dummy forward pass to correctly initialize the modulesiiPYTORCH_DDP_USE_SIDE_STREAM1) bucket_cap_mbparam_to_hook_all_reduce device_idsmodule process_groupbroadcast_bucket_sizesrcparams_and_buffers_to_ignorebroadcast_buffers)prepend with_kwargs)_AllreduceUpcastHookState"_reducer_allreduce_and_upcast_hook)r upcast_streamztorch.nn.parallel.distributed)rrrrr/_dynamoutilsget_optimize_ddp_mode_use_python_reducerrr_log_and_throw ValueError RuntimeErrorrrndim device_mesh get_grouptorch.distributed.device_meshr get_root_mesh%torch.distributed.tensor.parallel.ddpr_delay_all_reduce_paramsr8setrparameters_to_ignoreaddappendnamed_parameters_module_parametersanyleninforAis_multi_device_moduletypenextiter device_typer output_devicerrdimrrrrequire_backward_grad_syncrgradient_as_bucket_viewrwarningwarningswarn FutureWarningrKnn parameterUninitializedParameterintrbucket_bytes_cap_defaultbucket_bytes_caprrget!use_side_stream_for_tensor_copies_delay_grad_buffer_delay_grad_views_delay_all_reduce_all_params_register_delay_all_reduce_hookskip_all_reduce_unused_params_build_params_for_reducerr"r "_build_debug_param_to_name_mapping_ddp_init_helper _comm_hooksrGr>Stream _mp_streamrr_submodule_to_eventregister_forward_pre_hook_root_copy_hookmodules_module_wait_for_copy_hookAtorch.distributed.algorithms.ddp_comm_hooks.mixed_precision_hooksrrweakrefrefregister_comm_hookr _set_mixed_precision_param_dtyper(_has_rebuilt_buckets_set_static_graph_lazy_init_ran_accum_grad_hooks _inductorconfig_fuse_ddp_communication_fuse_ddp_bucket_size trace_rulesLEGACY_MOD_INLINELISTget_legacy_mod_inlinelist cache_clear_register_accum_grad_hookr)"rrrr%r&r init_syncrrrcheck_reductionr(rdelay_all_reduce_named_paramsrrrr9r root_meshrrWrFnrdistinct_device_typesxrCexpect_sparse_gradientparam_to_name_mappingrrupcast_hook_staters" r4rz DistributedDataParallel.__init__s( $ MM   5 5 7;K K  .2 -T9 :d $D 0?     3  [4N  "{':!3!5D   !.D 1$"@ QO +D !,!6!6!6!BD  E'55kBIK' )0(*% 6> ?(+F,T,T(UD %(+D % ( 4< < e))--d3--44U; < //1# 1111 #  DD,C,CDD4001 DE## K  !c*o&9   J  4#:#:;a; RAHHMM! !  $ % *   SShRiijl   %: ;<  :!#5(**]##**45Em_U-BFBYBY/ZQ/Z,]]^`#DO!%D CMNa0D9NDO$ *1 !2=$!GD ! 4 7 789@@ !2&<#*.'*.''>$.    + NN?AUAU V  MM?  ,, E%!3!3!J!JK## S &)):%;"  M,0D ),1D ) #MD$84$? @ JJNN8# ># E . ;?57,1) t,, - 2  0 0+)A% 1  00-J*.2-K-K-M* *  01C1CZ P {{"00&*&@&@-1-F-F"&"8"8  !% G G S   " !   ;=    + )$*>*> L $.. <#llnDO'25'9D $ KK 1 1$$e 2  ++--/ 0033! $1   !:#KK-#lln!   # #!2  LL 9 9$$00 %*!   " " $# 9;  # #=AEOO " " :;HEOO " " 8 MM % % ; ; ? ?/  MM % % ? ? K K M  * * , $y# ,<! 00[ Os$2a +a&a+-a+a0 a5c ddlmcmdtffd }t j D]Q\}}|j sjj|jtj||Sy)Nr param_indexcdjsy|jyjr*jD]\}}|||j|fy|jjj z }j |dj}|jj |y)Nsum)r'gradr=rsize all_reducecopy_)rFrbhookstategradientfcolrs r4compiled_accum_grad_hookzSDistributedDataParallel._register_accum_grad_hook..compiled_accum_grad_hooks 22zz!#'#3#35KD%U 345!::(:(:(?(?(AA??8UD LE5&&  " " ) )88%%0$)  r3ctj|j}|jj |tj |j|jd}|S)NTgroupasync_op)distget_world_sizerr5div_rg)rre world_size_s r4_delayed_all_reduce_hookz0DistributedDataParallel._delayed_all_reduce_hooksT((););<  $$Z0 OO  # #4+=+=  r3c>|tjdn|d}tjtd|jD||_|jDcgc]}|j }}tj|j||d|j|jd}|jD]i}|j |||jzj|j} |jj!| ||jz}k|j"j%D]D\} } | j'dD]*\} }|j(s| d| } | |j*vs)yFd|_ycc}w) Nrrc3<K|]}|jywrI)numelrs r4r[zJDistributedDataParallel._register_delay_all_reduce_hook..sAa AsrAFrecurserT)r/rAzerosrdrr5detachrz_broadcast_coalescedr register_hookrrviewshaper6rr named_modulesrrBrr7)rrrrrArdetached_paramsoffsetrF grad_view module_namer param_name full_names r4r8z7DistributedDataParallel._register_delay_all_reduce_hooks)3(:e$ 1 "'++ A4#@#@A A#  04/L/LM!188:MM !!$"4"4o}VWX !..t/L/LM22 ,E//&5;;=:PRWW I  " " ) )) 4ekkm+F  ,$(;;#<#<#>  K%+%<%.sVwq34Vrzddp.optimizer_in_backwardr)_apply_optim_in_backward_hook)gradient_is_bucket_view)rrr/_C_log_api_usage_oncerzoptimapply_optimizer_in_backwardparam_to_optim_hook_handle_mapr3removerFrGCtorch.distributed.algorithms.ddp_comm_hooks.optimizer_overlap_hooksrrHr(r_set_optimizer_in_backward)rparam_to_handle_maprhandlerrs r4_setup_in_backward_optimizersz5DistributedDataParallel._setup_in_backward_optimizers s Vd>U>UV V HH ( ()D E  66UU ,, $155a<$FMMO$ $ "++d+K   # #-,0,H,H  LL 3 3 59 Wr3c:|jj|y)a7 Fire the reducer's autograd hook to allreduce params in a Reducer bucket. Note that this is only used during mixed precision training as the Reducer's hooks installed during construction time would not be called as we're working in the low precision parameter setting. N)r_autograd_hook)ridxunuseds r4_fire_reducer_autograd_hookz3DistributedDataParallel._fire_reducer_autograd_hook2s ##C(r3argskwargsreturnc 0tt|_|j5|jj D]6}|j dD]}t|dr |jrt|j|jtj5|jj|j|j >|j j#|j$j&|j _ddd|j|_tj(}|j+|j|j-|9 dddy#1swYkxYw#1swYyxYw)a: For DDP mixed precision, put low precision copies on separate stream and create events to wait for them. When training with DDP mixed precision, this root pre-forward hook kicks off low precision copies on a separate stream and creates respective events to wait for them. Frr6N)rrr@r?rrCrCr8r6rr@rfr/no_gradrhr:rer9rr(Eventrecordr)rrr submodulerF copy_events r4rBz'DistributedDataParallel._root_copy_hook<sA$/u#5 __ G![[002 G &11%1@1Eun5%:L:L "5??EJJLA--ejj9!::1.3jjmm $ 4 4 @ @/EJJO"'EJ+1,#[[] !!#((3:::F3 G G G G Gs&BF &A0F A F F F  Fc |j|j}|jtj j |jdD]}|jrt|dr |jr(|j|}|jjdd}|jtj |j"|j$}||f|_y#t$rYywxYw)zlBefore carrying out computation, wait on the appropriate event to ensure low precision copies have finished.N)streamFrr6r)r@popleft IndexErrorwaitr/ acceleratorcurrent_streamrCrBr8r6 expand_asgrad_fnnext_functionsrrsrtr_idx_ddp_mp_hook_state) rrrreventrtmpgrad_accris r4rDz2DistributedDataParallel._module_wait_for_copy_hookds ,,V4<<>E  %++::< =""5"1 4A??wq.'Aann++a.C{{11!4Q7H))!!$"B"BAFFKD%-d#3A  4    sC99 DDc||j)|jjt|d|||)Nz: )rset_error_and_logstr)rerr_typeerr_msgs r4r z&DistributedDataParallel._log_and_throws8 ;; " KK ) )S]O2gY*G Hwr3c|dus|jdurtjg}n6|jrtj |j g}n |j g}t j|||\}}|jt|D] \}} || _ t j|tt|tt||j||j |j|j||jrtj n |j |j |j" |_t j&|j$|_|j$j+|j(d} |j,j/D]*} t1| t2j4j6s(d} n|j(j9|j,j:j<|j>gn |j>|j@dn |j@|jB| ||jE|j,y)a DDP init helper function to manage parameters, grad hooks, logging, and SyncBatchNorm. Initialization helper function that does the following: (1) bucketing the parameters for reductions (2) resetting the bucketing states (3) registering the grad hooks (4) Logging construction-time DDP logging data (5) passing a handle of DDP to SyncBatchNorm Layer TFN)#rsysmaxsizer1rz_DEFAULT_FIRST_BUCKET_BYTESr2"_compute_bucket_assignment_by_sizerrqrReducerr_reversedrr(r9r rLoggerr set_loggerrrCrKr/r- SyncBatchNormset_construction_data_and_logrr+rr%r_passing_sync_batchnorm_handle) rrCr^r_rbucket_size_limitsbucket_indicesper_bucket_size_limitsir has_sync_bnrs r4r<z(DistributedDataParallel._ddp_init_helpersH 4 4#>#>%#G"%++ ,,44))&" '+&;&;%<"  3 3   "   "    +!*- 1  ||  .) * 01 2    "  ! !  ' '  ( ( ! 0000**  . .  $ $/  4kk$,,/   , ,,. I)UXX%;%;<"   11 KK ! ! * *//)Bt$$,B$2D2D  " "     ++DKK8r3cv|jtj|j}|d=|d=|d=|S)Nrrr)_check_default_groupcopy__dict__)rattrss r4 __getstate__z$DistributedDataParallel.__getstate__s< !!# $--( / " )  (O r3ct|_t| ||jj dd|jj dd|j \}}|j|}|j||||j|jrC|jj|jJ|jjyy)NrTr') rrr __setstate__r setdefaultr:r;r<rrrKr)rrjrCr^r_rs r4rz$DistributedDataParallel.__setstate__s/1 U#   !=tD   !=tD-1-K-K-M* * $ G G S   " !        LL * * ,;;* ** KK ) ) + r3c2|jjDcgc]R\}}|jdDcgc]&\}}|jr|d||jvr|(c}}D]}||fT}}}}}}t }|D cgc]\}} | |vr|j | s|| f }}} |D cgc]\} }| } } }d} |D cgc] \}} | |} }} |j| | fScc}}wcc}}}}}wcc} }wcc}} wcc} }w)NFrrct|tjjtjjfr |j SyNF)rKr/r- Embedding EmbeddingBagsparse)rs r4produces_sparse_gradientzSDistributedDataParallel._build_params_for_reducer..produces_sparse_gradient4s2&588#5#5uxx7L7L"MN}}$r3)rrrrBrrr_assign_modules_buffers)rrrr.rrFmodules_and_parametersmemomrr~rCrr^s r4r:z1DistributedDataParallel._build_params_for_reducers](,{{'@'@'B " " # V*0)@)@)@)O  &J&&"m1ZL19R9RR  " Y  "  "  " u / " 1}TXXa[F" " 5KKLAyiK K ?U" 1: $V ," "  $$&111O  " "" L" s(C> +C8)C> #D8 D D8C> c |jjDcgc]\}}||jvr||f}}}|Dcgc]\}}| c}}|_|Dcic]\}}|| c}}|_ycc}}wcc}}wcc}}w)a Assign self.module.named_buffers to self.modules_buffers. Assigns module buffers to self.modules_buffers which are then used to broadcast across ranks when broadcast_buffers=True. Note that this must be called every time buffers need to be synced because buffers can be reassigned by user module, see https://github.com/pytorch/pytorch/issues/63916. N)r named_buffersrmodules_buffersnamed_module_buffers)r buffer_namebufferrs r4rz/DistributedDataParallel._assign_modules_buffersCs(,{{'@'@'B # V$";";;[ !  1E , F  >R% $9V[K % !  % sA3 A9 A?c tt|Dcic]}||| }}t|}i}|jj D]j\}}|j dD]P\}} |d|} | |j vs| js)| |vr|jtd| d|| } | || <Rlt|t|k7r/|jtdt|dt|d|Scc}w) NFrrzParam with name zt found in module parameters, but not DDP parameters. This indicates a bug in DDP, please report an issue to PyTorch.zUExpected param to name mapping to cover all parameters, but got conflicting lengths: z vs zA. This indicates a bug in DDP, please report an issue to PyTorch.) rangerrrrrrrBr r) rrCrparam_to_param_index param_setparam_index_to_param_fqnrrrrFfqnrbs r4r;z:DistributedDataParallel._build_debug_param_to_name_mapping[sC:?J:PQQ 1 q 0QQ O #% #';;#<#<#> @ K%+%<%S!9: :   114Y0@3456;; ('= Rs Dc#jKd}|r|jn|gD]}||Ed{y7w)z(Return a generator of module parameters.c3Kt|dr|jjn|jd}|Ed{y7w)N_former_parametersFr)r8rrfrC)rpss r4model_parameterszADistributedDataParallel._get_parameters..model_parameterssC123$$++-\\%\0  MMs>AAAN)rC)rrrrmods r4_get_parametersz'DistributedDataParallel._get_parameters|s9 #*199;s -C', , , - ,s '313cd} |jtk7rd}|r|jtdyy#t$rd}Y'wxYw)NFTzDDP Pickling/Unpickling are only supported when using DDP with the default process group. That is, when you have called init_process_group and have not passed process_group argument to DDP constructor)rrrr )rpickle_not_supporteds r4rz,DistributedDataParallel._check_default_groups\$ (!!%7%99'+$   <   (#'  (s7 AAc#bK|j}d|_ d||_y#||_wxYww)a0 Context manager to disable gradient synchronizations across DDP processes. Within this context, gradients will be accumulated on module variables, which will later be synchronized in the first forward-backward pass exiting the context. Example:: >>> # xdoctest: +SKIP("undefined variables") >>> ddp = torch.nn.parallel.DistributedDataParallel(model, pg) >>> with ddp.no_sync(): >>> for input in inputs: >>> ddp(input).backward() # no synchronization, accumulate grads >>> ddp(another_input).backward() # synchronize grads .. warning:: The forward pass should be included inside the context manager, or else gradients will still be synchronized. FN)r')rold_require_backward_grad_syncs r4no_synczDistributedDataParallel.no_syncs6,*.)H)H&*/' M .LD +.LD +s/#/ ,/c|jS)zL`TorchDynamo` requires DDP's status and module for cooperative optimization.)r)clss r4_get_active_ddp_modulez.DistributedDataParallel._get_active_ddp_modules%%%r3) recursivec#bK|t_ ddt_y#dt_wxYwwrI)r%rrs r4_inside_ddp_forwardz+DistributedDataParallel._inside_ddp_forwards+6:2 > 9= # 6 # 6s / / ,/c|jr|j|i|S|j5|j|i|cdddS#1swYyxYwrI)r rr )rrrs r4_run_ddp_forwardz(DistributedDataParallel._run_ddp_forwardsU  # #4;;1&1 1))+ 6"t{{F5f5 6 6 6s A  AcF|jtd|jD}t|jD]C\}}|j|j ||_|r*|jj E|r|jj yyy)Nc38K|]}|jduywrI)re)rXrFs r4r[z=DistributedDataParallel._clear_grad_buffer..s&', d"&s)r5allrrqrer6zero_)rall_param_grad_nonerurFs r4_clear_grad_bufferz*DistributedDataParallel._clear_grad_buffers  " " .#&&040M0M&# !*$*G*G H + u::%!%!7!7!>EJ. ((*  + #''--/# /r3c2|jd|_yr)rrLrs r4 _lazy_initz"DistributedDataParallel._lazy_inits **,"r3c |jr||fS|js.tjj s|j |j r||fStjrN|jrB|jJ|jj|jjtj|}|r&|jj||j tjr6|jj#rtj%dd|_|j)r|j+|j,j.r|j1d|j2rt5||tj6|j8|j2d|j:\}}|d|d}}|j<&t?|j<j@g|i|\}}||fS|j<&t?|j<j@g|i|\}}||fS)Nz4Reducer buckets have been rebuilt in this iteration.TFrr)!r rLr/compiler is_compilingrr7is_grad_enabledr'rset_runtime_stats_and_logrprepare_for_forwardrnotify_join_context_set_forward_pass_work_handlerrrrJ_check_sync_bufs_pre_fwd _sync_buffers _join_configenablerrr!rAr$r4rrr()rrrwork moved_inputs moved_kwargsrs r4 _pre_forwardz$DistributedDataParallel._pre_forwards  # #6> !""5>>+F+F+H OO   , ,6> !  "t'F'F;;* ** KK 1 1 3 LL , , .''-  LL 6 622   "t||'D'D'F KKN O(,D %  ( ( *       # #  : :% : P ??)3 T--tq/AB66 * &L, (?LO&D##/3((44    f < ##/!5((44""" 6> !r3c|jr|S|jr|j|S|jr|j t j ru|jrid|_|jr:|js.|jjtt|n#|jjgnd|_|jr |jr|jr|jst!|\}}}t#t%|Dcgc]}d}}t'|D]-\}}t j(|s|j*)|||</t-j.t1j2|g|}t#t%|D]}|| ||||<t5|||}|j|Scc}w)NTF)r r7r_check_sync_bufs_post_fwdrr/rr'rrrrprepare_for_backwardr_r]rrQrrrq is_tensorrrapplyrFrGrS) rrMrOrPrNr~output_placeholdersrpassthrough_tensor_lists r4 _post_forwardz%DistributedDataParallel._post_forward2s  # #M  , ,  # # %M  ) ) +     "t'F'F.2D + **43D3D 11$}V7L2MN 11"5.3D +  ' '0A0A   d&Q&Q (/  "$C(:$;<AA A ''9: 4 6??6*v~~/E-3'* 4'/nn D!'#' #3234 H&q)1-DQ-G'* H /#X~F ! ;As2 G'c@tjjjd5|j|i|\}}|j r|j j|i|n|j|i|}|j|cdddS#1swYyxYw)NzDistributedDataParallel.forward) r/autogradprofilerrecord_functionr#r7rrr r+)rrrrMs r4rzDistributedDataParallel.forwardws ^^ $ $ 4 45V W ..T..A&ANFF44$ ##V6v6*T**F=f=  %%f- . . .s A BBc4t||||jSN)r&)rr&)rrrrs r4scatterzDistributedDataParallel.scattersffjdhhGGr3cnt||tj|j||jSrI)r!r/rAr$r4)rrr device_ids r4 to_kwargsz!DistributedDataParallel.to_kwargss2   LL))9 5  2 2   r3c2t|||jSr1)rr&)routputsr%s r4rzDistributedDataParallel.gathersg}$((;;r3c&t|||SrI)rtrain)rmoders r4r9zDistributedDataParallel.trains  d r3c8|s.|jr"tjd|j}n!tjd|j}t j ||jd}|r%|j|jdk7}|Sy)NrrTrwr) r'r/onesrArrzrgrritem)rrrequires_sync_tensorr rs r4rzADistributedDataParallel._check_global_requires_backward_grad_syncs}$"A"A#(::a #D #(;;q#E  (:(:T   IIK$8$=$=$?1$D !( (r3c|jr.|j|jd}|j|yyr)r_find_common_rank_distributed_rank_sync_module_buffersrauthoritative_ranks r4rz6DistributedDataParallel._check_and_sync_module_bufferss<  ( ( *!%!7!78N8NPU!V   % %&8 9 +r3c|j|j||_t|j|j |j |j|j|jy)Nr) r@rA_authoritative_rankr rrrrrrs r4rz)DistributedDataParallel._sync_final_models`$(#9#9  " "N$   ;;,,"&"<"<(()-)B)B"44  r3cg}|jj}|D].}|jj|}|j|0|D]}|j yrI)r_get_zeros_like_grad_buckets_run_comm_hookrr)r comm_work grad_buckets grad_bucketr s r4rz6DistributedDataParallel._match_all_reduce_for_bwd_passsh ||@@B ' #K <<..{;D   T "  # D IIK r3cn|jj}|jj|yrI)r_get_local_used_mapr allreduce)rlocally_used_param_maps r4rz6DistributedDataParallel._match_unused_params_allreduces*!%!A!A!C $$%;>> # xdoctest: +SKIP("Distributed") >>> import torch >>> import torch.distributed as dist >>> import os >>> import torch.multiprocessing as mp >>> import torch.nn as nn >>> # On each spawned worker >>> def worker(rank): >>> dist.init_process_group("nccl", rank=rank, world_size=2) >>> torch.cuda.set_device(rank) >>> model = nn.Linear(1, 1, bias=False).to(rank) >>> model = torch.nn.parallel.DistributedDataParallel( >>> model, device_ids=[rank], output_device=rank >>> ) >>> # Rank 1 gets one more input than rank 0. >>> inputs = [torch.tensor([1]).float() for _ in range(10 + rank)] >>> with model.join(): >>> for _ in range(5): >>> for inp in inputs: >>> loss = model(inp).sum() >>> loss.backward() >>> # Without the join() API, the below synchronization will hang >>> # blocking for rank 1's allreduce to complete. >>> torch.cuda.synchronize(device=rank) r)r)rrrrQs r4joinzDistributedDataParallel.joins F F  &)E   r3c @|jdd}t||S)a DDP join hook enables training on uneven inputs by mirroring communications in forward and backward passes. Arguments: kwargs (dict): a :class:`dict` containing any keyword arguments to modify the behavior of the join hook at run time; all :class:`Joinable` instances sharing the same join context manager are forwarded the same value for ``kwargs``. The hook supports the following keyword arguments: divide_by_initial_world_size (bool, optional): If ``True``, then gradients are divided by the initial world size that DDP was launched with. If ``False``, then gradients are divided by the effective world size (i.e. the number of non-joined processes), meaning that the uneven inputs contribute more toward the global gradient. Typically, this should be set to ``True`` if the degree of unevenness is small but can be set to ``False`` in extreme cases for possibly better results. Default is ``True``. rTrS)r3r)rrrs r4 join_hookz!DistributedDataParallel.join_hookJs*2(.zz2PRV'W$ /K  r3c|jSrIrrs r4 join_devicez#DistributedDataParallel.join_devicehs {{r3c|jSrI)rrs r4join_process_groupz*DistributedDataParallel.join_process_groupls!!!r3ricDt|sJt||||_y)ab Allow custom registration of hooks that define how buffer are synchronized across ranks. The hook takes in an optional state and is passed in a Dict[str, Tensor] corresponding to buffer names and the buffers, and can run arbitrary reductions on buffers as opposed to DDP's default broadcast from rank 0. This is useful for example if a counter needs to be summed or averaged across ranks every iteration. Args: state (Any): Optional state that is passed to the hook. hook (Callable): Callable with the following signature: ``hook(state: object, bucket: dist.GradBucket) -> torch.futures.Future[torch.Tensor]`` comm_hook_location (_BufferCommHookLocation): Enum value indicating where to run the hook. _BufferCommHookLocation.PRE_FORWARD means that the hook will run _before_ the forward pass, and _BufferCommHookLocation.POST_FORWARD means that the hook will run _after_ the forward pass. NOTE: To maximize performance, users can return a List[torch.futures.Future] from their hook, and DDP will install and await these hooks appropriately at the end of the backward pass. This will ensure all buffers are synchronized by the end of the backward pass. If this setting is used, it is recommended to pass comm_hook_location=_BufferCommHookLocation.POST_FORWARD, which will trigger the hook after the forward pass. If _BufferCommHookLocation.PRE_FORWARD is used, users must ensure appropriate synchronization when manipulating GPU buffers in the forward pass. )rrrN)callabler buffer_hook)rrjricomm_hook_locations r4_register_buffer_comm_hookz2DistributedDataParallel._register_buffer_comm_hookps'J~~*!#(&8 r3rjc|j||jJ|jj|j|jj ||ft j|j||y)a Register communication hook for user-defined DDP aggregation of gradients across multiple workers. This hook would be very useful for researchers to try out new ideas. For example, this hook can be used to implement several algorithms like GossipGrad and gradient compression which involve different communication strategies for parameter syncs while running Distributed DataParallel training. Args: state (object): Passed to the hook to maintain any state information during the training process. Examples include error feedback in gradient compression, peers to communicate with next in GossipGrad, etc. It is locally stored by each worker and shared by all the gradient tensors on the worker. hook (Callable): Callable with the following signature: ``hook(state: object, bucket: dist.GradBucket) -> torch.futures.Future[torch.Tensor]``: This function is called once the bucket is ready. The hook can perform whatever processing is needed and return a Future indicating completion of any async work (ex: allreduce). If the hook doesn't perform any communication, it still must return a completed Future. The Future should hold the new value of grad bucket's tensors. Once a bucket is ready, c10d reducer would call this hook and use the tensors returned by the Future and copy grads to individual parameters. Note that the future's return type must be a single tensor. We also provide an API called ``get_future`` to retrieve a Future associated with the completion of ``c10d.ProcessGroup.Work``. ``get_future`` is currently supported for NCCL and also supported for most operations on GLOO and MPI, except for peer to peer operations (send/recv). .. warning :: Grad bucket's tensors will not be predivided by world_size. User is responsible to divide by the world_size in case of operations like allreduce. .. warning :: DDP communication hook can only be registered once and should be registered before calling backward. .. warning :: The Future object that hook returns should contain a single tensor that has the same shape with the tensors inside grad bucket. .. warning :: ``get_future`` API supports NCCL, and partially GLOO and MPI backends (no support for peer-to-peer operations like send/recv) and will return a ``torch.futures.Future``. Example:: Below is an example of a noop hook that returns the same tensor. >>> # xdoctest: +SKIP('undefined name') >>> def noop(state: object, bucket: dist.GradBucket) -> torch.futures.Future[torch.Tensor]: >>> fut = torch.futures.Future() >>> fut.set_result(bucket.buffer()) >>> return fut >>> ddp.register_comm_hook(state=None, hook=noop) Example:: Below is an example of a Parallel SGD algorithm where gradients are encoded before allreduce, and then decoded after allreduce. >>> # xdoctest: +SKIP('undefined name') >>> def encode_and_decode(state: object, bucket: dist.GradBucket) -> torch.futures.Future[torch.Tensor]: >>> encoded_tensor = encode(bucket.buffer()) # encode gradients >>> fut = torch.distributed.all_reduce(encoded_tensor).get_future() >>> # Define the then callback to decode. >>> def decode(fut): >>> decoded_tensor = decode(fut.value()[0]) # decode gradients >>> return decoded_tensor >>> return fut.then(decode) >>> ddp.register_comm_hook(state=None, hook=encode_and_decode) N) _check_comm_hookr_set_comm_hook_namer-r=rrz_register_comm_hookr)rrjris r4rHz*DistributedDataParallel.register_comm_hooksiV d#{{&&& ''(9(9: u .   ud;r3c|jJ|jjt|tj|j |y)a Register a built-in communication hook that specifies how DDP aggregates gradients across multiple workers. The built-in hooks aim to provide efficient C++ implementations for certain hooks, which might not be as efficient if implemented in Python using a Python communication hook. Args: comm_hook_type (dist.BuiltinCommHookType): type of communication hook, such as ALLREDUCE, FP16_COMPRESS, etc. .. warning :: DDP communication hook can only be registered once and should be registered before calling backward. Example:: Below is an example of a FP16 compression where gradients are compressed into 16-bit floating-point numbers before allreduce, and then decompressed after allreduce. >>> # xdoctest: +SKIP('undefined name') >>> ddp._register_builtin_comm_hook(dist.BuiltinCommHookType.FP16_COMPRESS) N)rrbrrz_register_builtin_comm_hookr)rcomm_hook_types r4rez3DistributedDataParallel._register_builtin_comm_hooks?.{{&&& ''N(;< ((~Fr3) optim_paramsrcddlm}|||g|i|} |j|y#t$r}t |d|d|d}~wwxYw)a Register an optimizer in DDP to optimize parameter immediately after its gradient reduction. Registers an optimizer with DDP such that the optimization for a parameter will run immediately when that parameter's gradient is finished with reduction, instead of waiting for all parameters' gradients to finish reduction. This can result in a training speedup depending on your workload since the optimizer can run while gradient reduction for other parameters are still ongoing. In addition, this has the potential to reduce peak memory consumption during training, as it only needs to load the per-parameter optimizer states of a single parameter at a time, instead of loading all per-parameter optimizer states at once. Args: optim (Type): a ``torch.optim.Optimizer`` class to be registered as a fused optimizer. *args (Sequence[Any]): Arguments to forward to `optim`. optim_params (Optional[Iterable[torch.Tensor]]): Set of parameters to optimize, similar to `params` argument of traditional `torch.optim` Optimizers. If this is omitted, all DDP model parameters will be optimized. **kwargs: (Dict[str, Any]): Keyword arguments to forward to `optim`. .. warning :: _register_fused_optim should only be called once on a DDP instance, and registering multiple fused optimizers for the same DDP model is not currently supported. Please ping https://github.com/pytorch/pytorch/issues/71595 if this is necessary for your use case. .. warning :: _register_fused_optim and register_comm_hook currently do not compose together, meaning that custom DDP communication hooks are not supported with overlapped optimizers. Please ping https://github.com/pytorch/pytorch/issues/71595 if this is necessary for your use case. .. warning :: Gradient accumulation and DDP `no_sync` are currently not supported with overlapped optimizer. Please ping https://github.com/pytorch/pytorch/issues/71595 if this is necessary for your use case. Example:: >>> # xdoctest: +SKIP("No rendezvous handler") >>> torch.distributed.init_process_group(backend='nccl', world_size=4, init_method='...') >>> net = torch.nn.parallel.DistributedDataParallel(model, pg) >>> lr = 1e-2 >>> betas = (0.9, 0.99) >>> eps = 1e-6 >>> net._register_fused_optim(torch.optim.Adam, lr, betas=betas, eps=eps) >>> # Example with subset of parameters >>> params_to_opt = [list(net.parameters())[0]] >>> net._register_fused_optim( ... torch.optim.Adam, lr, optim_params=params_to_opt, betas=betas, eps=eps ... ) r)_as_overlapped_optimz] does not support overlapped DDP. Please file an issue to PyTorch or the respective owner of rN)/torch.distributed.algorithms._optimizer_overlapri register_ddpNotImplementedErrorr)rrrgrrrioverlapped_optimes r4_register_fused_optimz-DistributedDataParallel._register_fused_optimsg| Y/|UdUfU   ) )$ /" 'vw|v}}~  s' AAAcHtj|j|||yrI)rzrr)rtensors buffer_sizerDs r4 _distributed_broadcast_coalescedz8DistributedDataParallel._distributed_broadcast_coalescedPs" !!   6H r3c|jxr5t|dxr'|jjtj k(SNr])will_sync_module_buffersr8r]rrrrs r4r%z1DistributedDataParallel._check_sync_bufs_post_fwdWsD  ) ) + 4m, 4  ::&334 r3c|jxr6t|d xs'|jjtj k(Sru)rvr8r]rrrrs r4rz0DistributedDataParallel._check_sync_bufs_pre_fwd_sG,,. m, , 399&223 r3cj|jxr&|jxrt|jdkDS)Nr)rrrrrs r4rvz0DistributedDataParallel.will_sync_module_buffersfs6  + + .&& .D(()A- r3c tj|r|ndg|j}tj|t j |j|jdk(r|jtd|jS)Nrr)oprxzuBUG! Expected rank_cond to be true for at least one process. This indicates a bug in PyTorch, please report an issue.) r/tensorrArzrgrMAXrr=r r)r input_rank rank_cond rank_to_uses r4r@z)DistributedDataParallel._find_common_rankmsxll$Z" -;;    Dct|ds|j|y|jj}|jj}|||j }||j j|yy)Nr])rD)r8_default_broadcast_coalescedr]rrrr_install_post_backward_futures)rrDrirjfutss r4rBz,DistributedDataParallel._sync_module_bufferssot]+  - -AS - T##44D$$;;Et889D ;;DA r3cb| |j}| |j}|j|||y)z Broadcasts buffers from rank 0 to rest of workers. If bufs, bucket_size are None, default values self.modules_buffers and self.broadcast_bucket_size are used instead. N)rrrs)rbufs bucket_sizerDs r4rz4DistributedDataParallel._default_broadcast_coalesceds8 <''D  44K --dKASTr3c|jD]W}t|tjjjs2|j dk(sB|j tdYy)Nrz/SyncBatchNorm layers only work with GPU modules)rCrKr/r-rr$r r)rrlayers r4rz6DistributedDataParallel._passing_sync_batchnorm_handlesT^^% E%!1!1!?!?@##u,''"I r3c*t|s|jtdtj|}|j dj tjk7r@|j dj tjk7r|jtd|jtjk7rN|jtjjtjk7r|jtd|j dvrtj"j$duxstj"j&du}tj(xrAtj*xr+tj$j,j#dk\}tj(xr4tj.xrtj0j)}|r|s|s|jtdyyyy)Nz$Communication hook must be callable.bucketz@Communication hook: bucket annotation should be dist.GradBucket.zSCommunication hook: return annotation should be torch.futures.Future[torch.Tensor].)bf16_compress_hookbf16_compress_wrapper_hook)r zSBF16 all reduce communication hook required CUDA 11+ and NCCL 2.10+ or XPU and XCCL)r\r  TypeErrorinspect signaturerC annotation_emptyrz GradBucketrreturn_annotationr/futuresFuturer^r+versioncudahip is_availableis_nccl_availablencclis_xccl_availablexpu)rrisigcuda_supportednccl_supportedxpu_xccl_supporteds r4raz(DistributedDataParallel._check_comm_hooks~    +Q R% NN8 $ / /7>> Ax(33tF   R   ! !W^^ 3%%)=)=ell)KK   e  ==P P ""$./""$. !!#9**,9JJOO++-8  !!#-**,-II**,  $;M##i(4F   9P9P9R E5.1  s2<<c||_|jD]\}}||vs d|_|jD]\}}||vs d|_y)a; Set parameters and buffers to be ignored by DDP. Expected format for parameters is the fully qualified name: {module_name}.{param_name}, and similarly, {module_name}.{buffer_name} for buffers. For example: params_to_ignore = [] # NB: model here is vanilla PyTorch module, not yet wrapped with DDP. for module_name, module in model.named_modules(): for param_name, param in module.named_parameters(recurse=False): if should_ignore(param): # Create expected format fqn = f"{module_name}.{param_name}" params_to_ignore.append(fqn) torch.nn.parallel.DistributedDataParallel._set_params_and_buffers_to_ignore_for_model( model, params_to_ignore ) TN)rrr6r)rrrWrFrs r4+_set_params_and_buffers_to_ignore_for_modelzCDistributedDataParallel._set_params_and_buffers_to_ignore_for_modelsi24P0!224 *KD%33%)" *#002 +LD&33&*# +r3c|jJ|jj}i|j|jS)aB Return a dictionary of logging data for debugging and analysis. This interface can be called after DistributedDataParallel() is constructed. It returns a dictionary of logging data. It could help for debugging and analysis. The logging data includes DistributedDataParallel constructor input parameters, some internal states of DistributedDataParallel and performance metrics. Simply print the dictionary and see what these metrics are. This is a prototype interface and subject to change in the future. )r_get_ddp_logging_datastrs_mapints_map)rddp_logging_datas r4rz-DistributedDataParallel._get_ddp_logging_data sE{{&&&;;<<>I"++I/?/H/HIIr3cp|dkr|jtd|jj|y)ae Set sample_rate of collecting runtime stats. This interface allows users to set sample_rate of collecting runtime stats. The runtime stats will be recorded for the first 10 iterations, after 10 iterations runtime stats will be recorded once every "sample_rate" training iterations. In default, runtime stats are recorded for the first 10 iterations, after 10 iterations runtime stats are recorded once every "kDDPRuntimeLoggingSampleRate=100" training iterations. This is a prototype interface and subject to change in the future. rzADDP runtime logging sample rate should be equal or greater than 1N)r rr$_set_ddp_runtime_logging_sample_rate)r sample_rates r4rz,>60*# D"LCJ.H < 0: $*= .2+0 h &*h h %) h T <""3?? * * XO<OrGrQrSr]rrrrrr%r2r3r4rs>  *%7799 *-FF#C< 4 88M*1 % %   8 $ #/#/ #/TE)(8,7td  77 7%x%B13813hu #fhu #r3