L io;ddlZddlmZddlmZddlmZmZejfdZ dejfdZ dejfdZ ejejfdZ ejejfdZejfd Zejfd Zejfd Zddejfd Zejejfd ZGddeZGddeZGddeZGddeZGddeZGddeZGddeZGddeZGddeZGd d!eZy)"N)Function)groupReduceOpc0tj|||S)a Broadcasts the tensor to the whole group. ``tensor`` must have the same number of elements in all processes participating in the collective. Arguments: tensor (Tensor): Data to be sent if ``src`` is the rank of current process. src (int): Source rank. group (ProcessGroup, optional): The process group to work on. Returns: Tensor: Received tensor from the broadcast op. ) _Broadcastapply)tensorsrcrs e/mnt/ssd/data/python-lab/Trading/venv/lib/python3.12/site-packages/torch/distributed/nn/functional.py broadcastr s"   C //c0tj|||S)aT Gathers a list of tensors in a single process. Arguments: tensor (Tensor): Input tensor. dst (int, optional): Destination rank (default is 0). group (ProcessGroup, optional): The process group to work on. Returns: tuple[Tensor]: List of appropriately-sized tensors with the gathered data. )_Gatherr)r dstrs r gatherr s ==eV ,,r c0tj||g|S)a Scatters a list of tensors to all processes in a group. Each process will receive exactly one tensor and store its data in the ``tensor`` argument. Arguments: tensors (list[Tensor]): List of tensors to scatter on the source rank. Receivers must pass ``None`. src (int, optional): Source rank (default is 0). group (ProcessGroup, optional): The process group to work on. Returns: Tensor: Output tensor from the scatter operation. )_Scatterr)tensorsr rs r scatterr/s" >>#u /w //r c2tj||||S)a Reduces the tensor data across all machines. Only the process with rank ``dst`` is going to receive the final result. Arguments: tensor (Tensor): Input of the collective. dst (int): Destination rank. op (optional): One of the values from ``torch.distributed.ReduceOp`` enum. Specifies an operation used for element-wise reductions. group (ProcessGroup, optional): The process group to work on. Returns: Tensor: Output of the collective. )_Reducer)r roprs r reducerCs$ ==b% 00r c2tj|||g|S)a Reduces, then scatters a list of tensors to all processes in a group. Arguments: output (Tensor): Output tensor. input_list (list[Tensor]): List of tensors to reduce and scatter. op (optional): One of the values from ``torch.distributed.ReduceOp`` enum. Specifies an operation used for element-wise reductions. group (ProcessGroup, optional): The process group to work on. Returns: Tensor: Output of the collective. )_Reduce_Scatterr)output input_listrrs r reduce_scatterrXs  UF @Z @@r c.tj||S)a Gathers tensors from the whole group in a list. Arguments: tensor (Tensor): Tensor to be broadcast from current process. group (ProcessGroup, optional): The process group to work on. Returns: tuple([Tensor]): Output of the collective. ) _AllGatherr)r rs r all_gatherr!ks   E6 **r c0tj|||S)a Single tensor all gather. Gathers a single tensor from all ranks, and puts them in a single output tensor. Args: output_tensor (Tensor): Output tensor. It should contain correctly-sized tensors to be used for output of the collective. input_tensor (Tensor): Tensor to be broadcast from current process. group (ProcessGroup, optional): The process group to work on. If None, the default process group will be used. Examples: >>> # All tensors below are of torch.int64 dtype. >>> # We have 2 process groups, 2 ranks. >>> # xdoctest: +SKIP("incorrect want text") >>> output_tensor = torch.zeros(2, dtype=torch.int64) >>> output_tensor [tensor([0, 0])] # Rank 0 and 1 >>> tensor = torch.arange(1, dtype=torch.int64) + 1 + rank >>> tensor tensor([1]) # Rank 0 tensor([2]) # Rank 1 >>> dist.all_gather_base(output_tensor, tensor) >>> output_tensor tensor([1,2]) # Rank 0 tensor([1,2]) # Rank 1 .. warning:: `_all_gather_base` is experimental and subject to change. It is the caller's responsibility to ensure the output_tensor is correctly sized. )_AllGatherBaser) output_tensor input_tensorrs r _all_gather_baser&zsB    |U CCr c0tj||g|S)a Each process scatters list of input tensors to all processes in a group and return gathered list of tensors in output list. Arguments: output_tensor_list (list[Tensor]): list of tensors to gather one per rank. input_tensor_list (list[Tensor]): List of tensors to scatter one per rank. group (ProcessGroup, optional): The process group to work on. Returns: tuple([Tensor]): Output of the collective. ) _AlltoAllr)output_tensor_listinput_tensor_listrs r all_to_allr+s ??5"4 I7H IIr c4tj|||||S)a Each process splits input tensor and then scatters the split list to all processes in a group. Then concatenate the received tensors from all the processes in the group and return single output tensor. Arguments: output (Tensor): Gathered concatenated output tensor. input (Tensor): Input tensor to scatter. output_split_sizes: (list[Int], optional): Output split sizes for dim 0 if specified None or empty, dim 0 of ``output`` tensor must divide equally by ``world_size``. input_split_sizes: (list[Int], optional): Input split sizes for dim 0 if specified None or empty, dim 0 of ``input`` tensor must divide equally by ``world_size``. Returns: Tensor: Output of the collective. )_AlltoAllSingler)rinputoutput_split_sizesinput_split_sizesrs r all_to_all_singler1s$4   v)+rEr r rrs(  r rc,eZdZedZedZy)rc||_||_ttj|Dcgc]}t j |}}|j}tj||k(r$tj||||t|Stj|d||t|Scc}wr7) rrranger9get_world_sizetorch zeros_like contiguousr:rtuple)r=rrr i tensor_lists r r>z_Gather.forwards /4D4G4Ge4T.U )*E  V $  ""$ ==u % , KK S >[!! KKc 7[!! sB?c`dtj|j|jg|fzSNNN)rrrr)r= grad_outputss r rEz_Gather.backward s(x~~cggsyyP<PRRRr NrFrKr r rrs*""$SSr rc,eZdZedZedZy)rc,||_||_tfdDsJtjd}t j ||k(r$t j|t|||St j|d|||S)Nc3fK|](}|jdjk(*yw)rN)size).0trs r z#_Scatter.forward..s'BQ1668wqz00Bs.1rr8) r rallrPrQr9r:rlist)r=r rrrs ` r r>z_Scatter.forwards B'BBBB!!'!*- ==u % , LLg5 A  LLs% 8 r c^dtj|j|j|zSrW)rrr rr=rCs r rEz_Scatter.backwards"gmmCGGSYY LLLr NrFrKr r rrs*  MMr rc,eZdZedZedZy)rct||_||_|j}tj|||||S)Nrr)r rr<r9r)r=r rrr s r r>z_Reduce.forward"s2  FCBe4 r c`dtj|j|j|fzSN)NNN)rrr rrds r rEz_Reduce.backward*s'!Z%5%5cggsyy+%V$XXXr NrFrKr r rr!s*YYr rc,eZdZedZedZy)rc||_|j}td|D}tj|t ||||S)Nc3<K|]}|jywr@rRr^r_s r r`z*_Reduce_Scatter.forward..5s!LQ!,,.!Lrg)rrRrSr9rrb)r=rrr r*s r r>z_Reduce_Scatter.forward0sH ""$!!L:K!LL FD):$;%P r cHdtj|j|zSri)r rrrds r rEz_Reduce_Scatter.backward9s!J$4$4SYY $LLLr NrFrKr r rr/s*MMr rc,eZdZedZedZy)r c|j}||_ttj|Dcgc]}t j |}}tj|||t|Scc}wr7) rRrrNr9rOrP empty_liker!rS)r=rr _out_tensor_lists r r>z_AllGather.forward?sp""$ .3D4G4Ge4T.U )*E  V $   u=_%%  sA:c:tj|jtjjurltj |j}t j||}tjtj|j|g|}d|fS|Dcgc]}t j|}}tj|j|g|}t jt j|d}d|fScc}w)Nr8r)dim)r9 get_backendrBackendNCCLr:rPrsrrrrAr(sumstack)r=rYr;rDr rUgxss r rEz_AllGather.backwardLs   #)) , 0A0A A==syy1D!!,t"45B &&x||SYYR\RBbzCOO5++F3OKO//#))[H<HC5;;s+3BbzPs+DNrFrKr r r r >s( & &  r r c,eZdZedZedZy)r#c`||_tj||j||Sr7)rr9r&rR)r=r$r%rs r r>z_AllGatherBase.forward\s*  m\-D-D-FeTr cLtj|jtjjurtj |j}t |j}|d|zdk7rtd|d||dtj |jz|d<tj||j|j}tj||tj|jn tdd|dfS)Nr8rzTensor with dimensions: z8 does not have first dimension divisible by world_size: devicedtypezBackend not supported!)r9rxrryrzrOrbr] RuntimeErrorrPemptyrr_reduce_scatter_baserrA)r=rC world_sizeout_sizerDs r rEz_AllGatherBase.backwardbs   #)) , 0A0A A,,399=JK,,./H{Z'1,".xj9IIS V#1+)<)<399)MMHQK!3!3;;L;LB  % %b+x||SYY O78 8b$r NrFrKr r r#r#[s(   r r#c,eZdZedZedZy)r(cb||_ttj|Dcgc]}||j c}|_tj |}td|D}tj|tjjur]ttj|D]0}d}||k(r t|}tj|||||2t|Stj|t||t|Scc}w)Nr8c3<K|]}|jywr@rmrns r r`z$_AlltoAll.forward..~s81 8ro)rrNr9rOr]input_tensor_size_listr:rSrxryGLOOrbrr+)r=rrurrTmy_rankto_sends r r>z_AlltoAll.forwardws ',T-@-@u-M'N& "#GAJOO & "--e,888   % (DLL,=,= =4..U;< J<"7mG _Q/!5I  J_%% OOW   _%%%& sD,c |jDcgc]4}tj||dj|dj6}}dt j |j|g|zScc}w)NrrrX)rrPrrrr(rr)r=rYr]rUs r rEz_AlltoAll.backwardss 22   KK\!_33<?;P;P   ioociiT|TTT  s9A.NrFrKr r r(r(vs*&&,UUr r(c,eZdZedZedZy)r-c||_|j|_||_||_t j ||||||S)N)r/r0r)rr] input_sizer/r0r9r1)r=rrr/r0r.s r r>z_AlltoAllSingle.forwardsJ !2 2   1/   r c tj|j|j|j}dt j |j||j|j|jfzS)Nr)NNNN) rPrrrrr-rrr/r0rR)r=rCr s r rEz_AlltoAllSingle.backwardsq NN;#5#5[=N=N (  ! ! &&%%&&(  +   r NrFrKr r r-r-s(      r r-c,eZdZedZedZy)r3c||_||_|jtj}t j ||||S)N) memory_formatrg)rrr<rPcontiguous_formatr9r4)r=rrr s r r>z_AllReduce.forwards; E,C,CD 2U3 r c`dtj|j|j|fzSrW)r3rrrrds r rEz_AllReduce.backwards'z// ;OQQQr NrFrKr r r3r3s*RRr r3)rPtorch.distributed distributedr9torch.autogradrrrWORLDr rrrArrr!r&r+r1r4rrrrrr r#r(r-r3rKr r rsF # ."'0( -%++0($< !DH=BKK J& ++ >#,,ekk/*  (ShS2MxM$ Yh Y Mh M: X 6 U UF h @ R Rr